This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new b8aeb79 ARROW-854: [Format] Add tentative SparseTensor format b8aeb79 is described below commit b8aeb79e94a5a507aeec55d0b6c6bf5d7f0100b2 Author: Kenta Murata <m...@mrkn.jp> AuthorDate: Wed Jan 9 16:18:19 2019 -0600 ARROW-854: [Format] Add tentative SparseTensor format I'm interested in making a language-agnostic sparse tensor format. I believe one of the suitable places to do this is Apache Arrow, so let me propose my idea of this here. First of all, I found that there is no common memory layout of sparse tensor representations in my investigation. It means we need some kinds of conversion to share sparse tensors among different systems even if the data format is logically the same. It is the same situation as dataframe, and this is the reason why I believe Apache Arrow is the suitable place. There are many formats to represent a sparse tensor. Most of them are specialized for a matrix, which has two dimensions. There are few formats for general sparse tensor with more than two dimensions. I think the COO format is suitable to start because COO can handle any dimensions, and many systems support the COO format. In my investigation, the systems support COO are SciPy, dask, pydata/sparse, TensorFlow, and PyTorch. Additionally, CSR format for matrices may also be good to support at the first time. The reason is that CSR format is efficient to extract row slices, that may be important for extracting samples from tidy data, and it is supported by SciPy, MXNet, and R's Matrix library. I add my prototype definition of SparseTensor format in this pull-request. I designed this prototype format to be extensible so that we can support additional sparse formats. I think we at least need to support additional sparse tensor format for more than two dimensions in addition to COO so we will need this extensibility. Author: Kenta Murata <m...@mrkn.jp> Closes #2546 from mrkn/sparse_tensor_proposal and squashes the following commits: 148bff822 <Kenta Murata> make format d57e56fc6 <Kenta Murata> Merge sparse_tensor_format.h into sparse_tensor.h 880bbc4eb <Kenta Murata> Rename too-verbose function name c83ea6aaf <Kenta Murata> Add type aliases of sparse tensor types 90e8b3166 <Kenta Murata> Rename sparse tensor classes 07a651863 <Kenta Murata> Use substitution instead of constructor call 37a0a14c6 <Kenta Murata> Remove needless function declaration 97e85bd35 <Kenta Murata> Use std::make_shared 3dd434c83 <Kenta Murata> Capitalize member function name 6ef6ad065 <Kenta Murata> Apply code formatter 6f291581e <Kenta Murata> Mark APIs for sparse tensor as EXPERIMENTAL ff3ea71c5 <Kenta Murata> Rename length to non_zero_length in SparseTensor f78230344 <Kenta Murata> Return Status::IOError instead of DCHECK if message header type is not matched 7e814de36 <Kenta Murata> Put EXPERIMENTAL markn in comments 357860d8c <Kenta Murata> Fix typo in comments 43d8eea44 <Kenta Murata> Fix coding style 99b1d1d4d <Kenta Murata> Add missing ARROW_EXPORT specifiers 401ae8023 <Kenta Murata> Fix SparseCSRIndex::ToString and add tests 9e457acd3 <Kenta Murata> Remove needless virtual specifiers 3b1db7d32 <Kenta Murata> Add SparseTensorBase::Equals d6a8c3805 <Kenta Murata> Unify Tensor.fbs and SparseTensor.fbs b3a62ebfa <Kenta Murata> Fix format 6bc9e296f <Kenta Murata> Support IPC read and write of SparseTensor 1d9042709 <Kenta Murata> Fix format 51a83bfee <Kenta Murata> Add SparseTensorFormat 93c03adad <Kenta Murata> Add SparseIndex::ToString() 021b46be0 <Kenta Murata> Add SparseTensorBase ed3984dd4 <Kenta Murata> Add SparseIndex::format_type 4251b4d08 <Kenta Murata> Add SparseCSRIndex 433c9b441 <Kenta Murata> Change COO index matrix to column-major in a format description 392a25b7c <Kenta Murata> Implement SparseTensor and SparseCOOIndex b24f3c342 <Kenta Murata> Insert additional padding in sparse tensor format c508db086 <Kenta Murata> Write sparse tensor format in IPC.md 2b50040f5 <Kenta Murata> Add an example of the CSR format in comment 76c56dd35 <Kenta Murata> Make indptr of CSR a buffer d7e653f17 <Kenta Murata> Add an example of COO format in comment 866b2c13a <Kenta Murata> Add header comments in SparseTensor.fbs aa9b8a4d0 <Kenta Murata> Add SparseTensor.fbs in FBS_SRC 1f16ffed8 <Kenta Murata> Fix syntax error in SparseTensor.fbs c3bc6edfa <Kenta Murata> Add tentative SparseTensor format --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/compare.cc | 93 +++++++ cpp/src/arrow/compare.h | 4 + cpp/src/arrow/ipc/message.cc | 2 + cpp/src/arrow/ipc/message.h | 2 +- cpp/src/arrow/ipc/metadata-internal.cc | 148 +++++++++++ cpp/src/arrow/ipc/metadata-internal.h | 12 + cpp/src/arrow/ipc/read-write-test.cc | 112 ++++++++ cpp/src/arrow/ipc/reader.cc | 119 +++++++++ cpp/src/arrow/ipc/reader.h | 17 ++ cpp/src/arrow/ipc/writer.cc | 101 ++++++++ cpp/src/arrow/ipc/writer.h | 15 ++ cpp/src/arrow/sparse_tensor-test.cc | 244 ++++++++++++++++++ cpp/src/arrow/sparse_tensor.cc | 452 +++++++++++++++++++++++++++++++++ cpp/src/arrow/sparse_tensor.h | 211 +++++++++++++++ cpp/src/arrow/tensor.h | 6 + docs/source/format/IPC.rst | 24 ++ format/Message.fbs | 4 +- format/Tensor.fbs | 96 +++++++ 19 files changed, 1661 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index f2a8112..91bdce2 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -83,6 +83,7 @@ set(ARROW_SRCS table.cc table_builder.cc tensor.cc + sparse_tensor.cc type.cc visitor.cc @@ -286,6 +287,7 @@ ADD_ARROW_TEST(type-test) ADD_ARROW_TEST(table-test) ADD_ARROW_TEST(table_builder-test) ADD_ARROW_TEST(tensor-test) +ADD_ARROW_TEST(sparse_tensor-test) ADD_ARROW_BENCHMARK(builder-benchmark) ADD_ARROW_BENCHMARK(column-benchmark) diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index efc8ad8..1147529 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -30,6 +30,7 @@ #include "arrow/array.h" #include "arrow/buffer.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" @@ -782,6 +783,98 @@ bool TensorEquals(const Tensor& left, const Tensor& right) { return are_equal; } +namespace { + +template <typename LeftSparseIndexType, typename RightSparseIndexType> +struct SparseTensorEqualsImpl { + static bool Compare(const SparseTensorImpl<LeftSparseIndexType>& left, + const SparseTensorImpl<RightSparseIndexType>& right) { + // TODO(mrkn): should we support the equality among different formats? + return false; + } +}; + +template <typename SparseIndexType> +struct SparseTensorEqualsImpl<SparseIndexType, SparseIndexType> { + static bool Compare(const SparseTensorImpl<SparseIndexType>& left, + const SparseTensorImpl<SparseIndexType>& right) { + DCHECK(left.type()->id() == right.type()->id()); + DCHECK(left.shape() == right.shape()); + DCHECK(left.non_zero_length() == right.non_zero_length()); + + const auto& left_index = checked_cast<const SparseIndexType&>(*left.sparse_index()); + const auto& right_index = checked_cast<const SparseIndexType&>(*right.sparse_index()); + + if (!left_index.Equals(right_index)) { + return false; + } + + const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type()); + const int byte_width = size_meta.bit_width() / CHAR_BIT; + DCHECK_GT(byte_width, 0); + + const uint8_t* left_data = left.data()->data(); + const uint8_t* right_data = right.data()->data(); + + return memcmp(left_data, right_data, + static_cast<size_t>(byte_width * left.non_zero_length())); + } +}; + +template <typename SparseIndexType> +inline bool SparseTensorEqualsImplDispatch(const SparseTensorImpl<SparseIndexType>& left, + const SparseTensor& right) { + switch (right.format_id()) { + case SparseTensorFormat::COO: { + const auto& right_coo = + checked_cast<const SparseTensorImpl<SparseCOOIndex>&>(right); + return SparseTensorEqualsImpl<SparseIndexType, SparseCOOIndex>::Compare(left, + right_coo); + } + + case SparseTensorFormat::CSR: { + const auto& right_csr = + checked_cast<const SparseTensorImpl<SparseCSRIndex>&>(right); + return SparseTensorEqualsImpl<SparseIndexType, SparseCSRIndex>::Compare(left, + right_csr); + } + + default: + return false; + } +} + +} // namespace + +bool SparseTensorEquals(const SparseTensor& left, const SparseTensor& right) { + if (&left == &right) { + return true; + } else if (left.type()->id() != right.type()->id()) { + return false; + } else if (left.size() == 0) { + return true; + } else if (left.shape() != right.shape()) { + return false; + } else if (left.non_zero_length() != right.non_zero_length()) { + return false; + } + + switch (left.format_id()) { + case SparseTensorFormat::COO: { + const auto& left_coo = checked_cast<const SparseTensorImpl<SparseCOOIndex>&>(left); + return SparseTensorEqualsImplDispatch(left_coo, right); + } + + case SparseTensorFormat::CSR: { + const auto& left_csr = checked_cast<const SparseTensorImpl<SparseCSRIndex>&>(left); + return SparseTensorEqualsImplDispatch(left_csr, right); + } + + default: + return false; + } +} + bool TypeEquals(const DataType& left, const DataType& right) { bool are_equal; // The arrays are the same object diff --git a/cpp/src/arrow/compare.h b/cpp/src/arrow/compare.h index 21e2fdc..d49d7cc 100644 --- a/cpp/src/arrow/compare.h +++ b/cpp/src/arrow/compare.h @@ -29,12 +29,16 @@ namespace arrow { class Array; class DataType; class Tensor; +class SparseTensor; /// Returns true if the arrays are exactly equal bool ARROW_EXPORT ArrayEquals(const Array& left, const Array& right); bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right); +/// EXPERIMENTAL: Returns true if the given sparse tensors are exactly equal +bool ARROW_EXPORT SparseTensorEquals(const SparseTensor& left, const SparseTensor& right); + /// Returns true if the arrays are approximately equal. For non-floating point /// types, this is equivalent to ArrayEquals(left, right) bool ARROW_EXPORT ArrayApproxEquals(const Array& left, const Array& right); diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 8adf4a8..23709a4 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -63,6 +63,8 @@ class Message::MessageImpl { return Message::RECORD_BATCH; case flatbuf::MessageHeader_Tensor: return Message::TENSOR; + case flatbuf::MessageHeader_SparseTensor: + return Message::SPARSE_TENSOR; default: return Message::NONE; } diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 092a19f..760012d 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -70,7 +70,7 @@ constexpr int kMaxNestingDepth = 64; /// \brief An IPC message including metadata and body class ARROW_EXPORT Message { public: - enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR }; + enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR, SPARSE_TENSOR }; /// \brief Construct message, but do not validate /// diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 1d4c80c..da67113 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -31,6 +31,7 @@ #include "arrow/ipc/Tensor_generated.h" // IWYU pragma: keep #include "arrow/ipc/message.h" #include "arrow/ipc/util.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" @@ -50,6 +51,7 @@ using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>; using FieldOffset = flatbuffers::Offset<flatbuf::Field>; using KeyValueOffset = flatbuffers::Offset<flatbuf::KeyValue>; using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>; +using SparseTensorOffset = flatbuffers::Offset<flatbuf::SparseTensor>; using Offset = flatbuffers::Offset<void>; using FBString = flatbuffers::Offset<flatbuffers::String>; @@ -781,6 +783,106 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, body_length, out); } +Status MakeSparseTensorIndexCOO(FBB& fbb, const SparseCOOIndex& sparse_index, + const std::vector<BufferMetadata>& buffers, + flatbuf::SparseTensorIndex* fb_sparse_index_type, + Offset* fb_sparse_index, size_t* num_buffers) { + *fb_sparse_index_type = flatbuf::SparseTensorIndex_SparseTensorIndexCOO; + const BufferMetadata& indices_metadata = buffers[0]; + flatbuf::Buffer indices(indices_metadata.offset, indices_metadata.length); + *fb_sparse_index = flatbuf::CreateSparseTensorIndexCOO(fbb, &indices).Union(); + *num_buffers = 1; + return Status::OK(); +} + +Status MakeSparseMatrixIndexCSR(FBB& fbb, const SparseCSRIndex& sparse_index, + const std::vector<BufferMetadata>& buffers, + flatbuf::SparseTensorIndex* fb_sparse_index_type, + Offset* fb_sparse_index, size_t* num_buffers) { + *fb_sparse_index_type = flatbuf::SparseTensorIndex_SparseMatrixIndexCSR; + const BufferMetadata& indptr_metadata = buffers[0]; + const BufferMetadata& indices_metadata = buffers[1]; + flatbuf::Buffer indptr(indptr_metadata.offset, indptr_metadata.length); + flatbuf::Buffer indices(indices_metadata.offset, indices_metadata.length); + *fb_sparse_index = flatbuf::CreateSparseMatrixIndexCSR(fbb, &indptr, &indices).Union(); + *num_buffers = 2; + return Status::OK(); +} + +Status MakeSparseTensorIndex(FBB& fbb, const SparseIndex& sparse_index, + const std::vector<BufferMetadata>& buffers, + flatbuf::SparseTensorIndex* fb_sparse_index_type, + Offset* fb_sparse_index, size_t* num_buffers) { + switch (sparse_index.format_id()) { + case SparseTensorFormat::COO: + RETURN_NOT_OK(MakeSparseTensorIndexCOO( + fbb, checked_cast<const SparseCOOIndex&>(sparse_index), buffers, + fb_sparse_index_type, fb_sparse_index, num_buffers)); + break; + + case SparseTensorFormat::CSR: + RETURN_NOT_OK(MakeSparseMatrixIndexCSR( + fbb, checked_cast<const SparseCSRIndex&>(sparse_index), buffers, + fb_sparse_index_type, fb_sparse_index, num_buffers)); + break; + + default: + std::stringstream ss; + ss << "Unsupporoted sparse tensor format:: " << sparse_index.ToString() + << std::endl; + return Status::NotImplemented(ss.str()); + } + + return Status::OK(); +} + +Status MakeSparseTensor(FBB& fbb, const SparseTensor& sparse_tensor, int64_t body_length, + const std::vector<BufferMetadata>& buffers, + SparseTensorOffset* offset) { + flatbuf::Type fb_type_type; + Offset fb_type; + RETURN_NOT_OK( + TensorTypeToFlatbuffer(fbb, *sparse_tensor.type(), &fb_type_type, &fb_type)); + + using TensorDimOffset = flatbuffers::Offset<flatbuf::TensorDim>; + std::vector<TensorDimOffset> dims; + for (int i = 0; i < sparse_tensor.ndim(); ++i) { + FBString name = fbb.CreateString(sparse_tensor.dim_name(i)); + dims.push_back(flatbuf::CreateTensorDim(fbb, sparse_tensor.shape()[i], name)); + } + + auto fb_shape = fbb.CreateVector(dims); + + flatbuf::SparseTensorIndex fb_sparse_index_type; + Offset fb_sparse_index; + size_t num_index_buffers = 0; + RETURN_NOT_OK(MakeSparseTensorIndex(fbb, *sparse_tensor.sparse_index(), buffers, + &fb_sparse_index_type, &fb_sparse_index, + &num_index_buffers)); + + const BufferMetadata& data_metadata = buffers[num_index_buffers]; + flatbuf::Buffer data(data_metadata.offset, data_metadata.length); + + const int64_t non_zero_length = sparse_tensor.non_zero_length(); + + *offset = + flatbuf::CreateSparseTensor(fbb, fb_type_type, fb_type, fb_shape, non_zero_length, + fb_sparse_index_type, fb_sparse_index, &data); + + return Status::OK(); +} + +Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length, + const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out) { + FBB fbb; + SparseTensorOffset fb_sparse_tensor; + RETURN_NOT_OK( + MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor)); + return WriteFBMessage(fbb, flatbuf::MessageHeader_SparseTensor, + fb_sparse_tensor.Union(), body_length, out); +} + Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length, const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, @@ -933,6 +1035,52 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type return TypeFromFlatbuffer(tensor->type_type(), tensor->type(), {}, type); } +Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, + std::vector<int64_t>* shape, + std::vector<std::string>* dim_names, + int64_t* non_zero_length, + SparseTensorFormat::type* sparse_tensor_format_id) { + auto message = flatbuf::GetMessage(metadata.data()); + if (message->header_type() != flatbuf::MessageHeader_SparseTensor) { + return Status::IOError("Header of flatbuffer-encoded Message is not SparseTensor."); + } + if (message->header() == nullptr) { + return Status::IOError("Header-pointer of flatbuffer-encoded Message is null."); + } + + auto sparse_tensor = reinterpret_cast<const flatbuf::SparseTensor*>(message->header()); + int ndim = static_cast<int>(sparse_tensor->shape()->size()); + + for (int i = 0; i < ndim; ++i) { + auto dim = sparse_tensor->shape()->Get(i); + + shape->push_back(dim->size()); + auto fb_name = dim->name(); + if (fb_name == 0) { + dim_names->push_back(""); + } else { + dim_names->push_back(fb_name->str()); + } + } + + *non_zero_length = sparse_tensor->non_zero_length(); + + switch (sparse_tensor->sparseIndex_type()) { + case flatbuf::SparseTensorIndex_SparseTensorIndexCOO: + *sparse_tensor_format_id = SparseTensorFormat::COO; + break; + + case flatbuf::SparseTensorIndex_SparseMatrixIndexCSR: + *sparse_tensor_format_id = SparseTensorFormat::CSR; + break; + + default: + return Status::Invalid("Unrecognized sparse index type"); + } + + return TypeFromFlatbuffer(sparse_tensor->type_type(), sparse_tensor->type(), {}, type); +} + // ---------------------------------------------------------------------- // Implement message writing diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 152ca13..6562382 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -33,6 +33,7 @@ #include "arrow/ipc/dictionary.h" // IYWU pragma: keep #include "arrow/ipc/message.h" #include "arrow/memory_pool.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" namespace arrow { @@ -40,6 +41,7 @@ namespace arrow { class DataType; class Schema; class Tensor; +class SparseTensor; namespace flatbuf = org::apache::arrow::flatbuf; @@ -103,6 +105,12 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type std::vector<int64_t>* shape, std::vector<int64_t>* strides, std::vector<std::string>* dim_names); +// EXPERIMENTAL: Extracting metadata of a sparse tensor from the message +Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, + std::vector<int64_t>* shape, + std::vector<std::string>* dim_names, int64_t* length, + SparseTensorFormat::type* sparse_tensor_format_id); + /// Write a serialized message metadata with a length-prefix and padding to an /// 8-byte offset. Does not make assumptions about whether the stream is /// aligned already @@ -137,6 +145,10 @@ Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, std::shared_ptr<Buffer>* out); +Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length, + const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out); + Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, io::OutputStream* out); diff --git a/cpp/src/arrow/ipc/read-write-test.cc b/cpp/src/arrow/ipc/read-write-test.cc index 3a723ba..bc27386 100644 --- a/cpp/src/arrow/ipc/read-write-test.cc +++ b/cpp/src/arrow/ipc/read-write-test.cc @@ -38,6 +38,7 @@ #include "arrow/ipc/writer.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/test-util.h" @@ -844,6 +845,117 @@ TEST_F(TestTensorRoundTrip, NonContiguous) { CheckTensorRoundTrip(tensor); } +class TestSparseTensorRoundTrip : public ::testing::Test, public IpcTestFixture { + public: + void SetUp() { pool_ = default_memory_pool(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } + + template <typename SparseIndexType> + void CheckSparseTensorRoundTrip(const SparseTensorImpl<SparseIndexType>& tensor) { + GTEST_FAIL(); + } +}; + +template <> +void TestSparseTensorRoundTrip::CheckSparseTensorRoundTrip<SparseCOOIndex>( + const SparseTensorImpl<SparseCOOIndex>& tensor) { + const auto& type = checked_cast<const FixedWidthType&>(*tensor.type()); + const int elem_size = type.bit_width() / 8; + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK(WriteSparseTensor(tensor, mmap_.get(), &metadata_length, &body_length, + default_memory_pool())); + + const auto& sparse_index = checked_cast<const SparseCOOIndex&>(*tensor.sparse_index()); + const int64_t indices_length = elem_size * sparse_index.indices()->size(); + const int64_t data_length = elem_size * tensor.non_zero_length(); + const int64_t expected_body_length = indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr<SparseTensor> result; + ASSERT_OK(ReadSparseTensor(mmap_.get(), &result)); + + const auto& resulted_sparse_index = + checked_cast<const SparseCOOIndex&>(*result->sparse_index()); + ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(result->Equals(*result)); +} + +template <> +void TestSparseTensorRoundTrip::CheckSparseTensorRoundTrip<SparseCSRIndex>( + const SparseTensorImpl<SparseCSRIndex>& tensor) { + const auto& type = checked_cast<const FixedWidthType&>(*tensor.type()); + const int elem_size = type.bit_width() / 8; + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK(WriteSparseTensor(tensor, mmap_.get(), &metadata_length, &body_length, + default_memory_pool())); + + const auto& sparse_index = checked_cast<const SparseCSRIndex&>(*tensor.sparse_index()); + const int64_t indptr_length = elem_size * sparse_index.indptr()->size(); + const int64_t indices_length = elem_size * sparse_index.indices()->size(); + const int64_t data_length = elem_size * tensor.non_zero_length(); + const int64_t expected_body_length = indptr_length + indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr<SparseTensor> result; + ASSERT_OK(ReadSparseTensor(mmap_.get(), &result)); + + const auto& resulted_sparse_index = + checked_cast<const SparseCSRIndex&>(*result->sparse_index()); + ASSERT_EQ(resulted_sparse_index.indptr()->data()->size(), indptr_length); + ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(result->Equals(*result)); +} + +TEST_F(TestSparseTensorRoundTrip, WithSparseCOOIndex) { + std::string path = "test-write-sparse-coo-tensor"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); + + std::vector<int64_t> shape = {2, 3, 4}; + std::vector<std::string> dim_names = {"foo", "bar", "baz"}; + std::vector<int64_t> values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor<Int64Type> t(data, shape, {}, dim_names); + SparseTensorImpl<SparseCOOIndex> st(t); + + CheckSparseTensorRoundTrip(st); +} + +TEST_F(TestSparseTensorRoundTrip, WithSparseCSRIndex) { + std::string path = "test-write-sparse-csr-matrix"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); + + std::vector<int64_t> shape = {4, 6}; + std::vector<std::string> dim_names = {"foo", "bar", "baz"}; + std::vector<int64_t> values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor<Int64Type> t(data, shape, {}, dim_names); + SparseTensorImpl<SparseCSRIndex> st(t); + + CheckSparseTensorRoundTrip(st); +} + TEST(TestRecordBatchStreamReader, MalformedInput) { const std::string empty_str = ""; const std::string garbage_str = "12345678"; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 59a322a..e856aca 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -38,6 +38,7 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" @@ -726,5 +727,123 @@ Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out) { return Status::OK(); } +namespace { + +Status ReadSparseCOOIndex(const flatbuf::SparseTensor* sparse_tensor, int64_t ndim, + int64_t non_zero_length, io::RandomAccessFile* file, + std::shared_ptr<SparseIndex>* out) { + auto* sparse_index = sparse_tensor->sparseIndex_as_SparseTensorIndexCOO(); + auto* indices_buffer = sparse_index->indicesBuffer(); + std::shared_ptr<Buffer> indices_data; + RETURN_NOT_OK( + file->ReadAt(indices_buffer->offset(), indices_buffer->length(), &indices_data)); + std::vector<int64_t> shape({non_zero_length, ndim}); + const int64_t elsize = sizeof(int64_t); + std::vector<int64_t> strides({elsize, elsize * non_zero_length}); + *out = std::make_shared<SparseCOOIndex>( + std::make_shared<SparseCOOIndex::CoordsTensor>(indices_data, shape, strides)); + return Status::OK(); +} + +Status ReadSparseCSRIndex(const flatbuf::SparseTensor* sparse_tensor, int64_t ndim, + int64_t non_zero_length, io::RandomAccessFile* file, + std::shared_ptr<SparseIndex>* out) { + auto* sparse_index = sparse_tensor->sparseIndex_as_SparseMatrixIndexCSR(); + + auto* indptr_buffer = sparse_index->indptrBuffer(); + std::shared_ptr<Buffer> indptr_data; + RETURN_NOT_OK( + file->ReadAt(indptr_buffer->offset(), indptr_buffer->length(), &indptr_data)); + + auto* indices_buffer = sparse_index->indicesBuffer(); + std::shared_ptr<Buffer> indices_data; + RETURN_NOT_OK( + file->ReadAt(indices_buffer->offset(), indices_buffer->length(), &indices_data)); + + std::vector<int64_t> indptr_shape({ndim + 1}); + std::vector<int64_t> indices_shape({non_zero_length}); + *out = std::make_shared<SparseCSRIndex>( + std::make_shared<SparseCSRIndex::IndexTensor>(indptr_data, indptr_shape), + std::make_shared<SparseCSRIndex::IndexTensor>(indices_data, indices_shape)); + return Status::OK(); +} + +Status MakeSparseTensorWithSparseCOOIndex( + const std::shared_ptr<DataType>& type, const std::vector<int64_t>& shape, + const std::vector<std::string>& dim_names, + const std::shared_ptr<SparseCOOIndex>& sparse_index, int64_t non_zero_length, + const std::shared_ptr<Buffer>& data, std::shared_ptr<SparseTensor>* out) { + *out = std::make_shared<SparseTensorImpl<SparseCOOIndex>>(sparse_index, type, data, + shape, dim_names); + return Status::OK(); +} + +Status MakeSparseTensorWithSparseCSRIndex( + const std::shared_ptr<DataType>& type, const std::vector<int64_t>& shape, + const std::vector<std::string>& dim_names, + const std::shared_ptr<SparseCSRIndex>& sparse_index, int64_t non_zero_length, + const std::shared_ptr<Buffer>& data, std::shared_ptr<SparseTensor>* out) { + *out = std::make_shared<SparseTensorImpl<SparseCSRIndex>>(sparse_index, type, data, + shape, dim_names); + return Status::OK(); +} + +} // namespace + +Status ReadSparseTensor(const Buffer& metadata, io::RandomAccessFile* file, + std::shared_ptr<SparseTensor>* out) { + std::shared_ptr<DataType> type; + std::vector<int64_t> shape; + std::vector<std::string> dim_names; + int64_t non_zero_length; + SparseTensorFormat::type sparse_tensor_format_id; + + RETURN_NOT_OK(internal::GetSparseTensorMetadata( + metadata, &type, &shape, &dim_names, &non_zero_length, &sparse_tensor_format_id)); + + auto message = flatbuf::GetMessage(metadata.data()); + auto sparse_tensor = reinterpret_cast<const flatbuf::SparseTensor*>(message->header()); + const flatbuf::Buffer* buffer = sparse_tensor->data(); + DCHECK(BitUtil::IsMultipleOf8(buffer->offset())) + << "Buffer of sparse index data " + << "did not start on 8-byte aligned offset: " << buffer->offset(); + + std::shared_ptr<Buffer> data; + RETURN_NOT_OK(file->ReadAt(buffer->offset(), buffer->length(), &data)); + + std::shared_ptr<SparseIndex> sparse_index; + switch (sparse_tensor_format_id) { + case SparseTensorFormat::COO: + RETURN_NOT_OK(ReadSparseCOOIndex(sparse_tensor, shape.size(), non_zero_length, file, + &sparse_index)); + return MakeSparseTensorWithSparseCOOIndex( + type, shape, dim_names, std::dynamic_pointer_cast<SparseCOOIndex>(sparse_index), + non_zero_length, data, out); + + case SparseTensorFormat::CSR: + RETURN_NOT_OK(ReadSparseCSRIndex(sparse_tensor, shape.size(), non_zero_length, file, + &sparse_index)); + return MakeSparseTensorWithSparseCSRIndex( + type, shape, dim_names, std::dynamic_pointer_cast<SparseCSRIndex>(sparse_index), + non_zero_length, data, out); + + default: + return Status::Invalid("Unsupported sparse index format"); + } +} + +Status ReadSparseTensor(const Message& message, std::shared_ptr<SparseTensor>* out) { + io::BufferReader buffer_reader(message.body()); + return ReadSparseTensor(*message.metadata(), &buffer_reader, out); +} + +Status ReadSparseTensor(io::InputStream* file, std::shared_ptr<SparseTensor>* out) { + std::unique_ptr<Message> message; + RETURN_NOT_OK(ReadContiguousPayload(file, &message)); + DCHECK_EQ(message->type(), Message::SPARSE_TENSOR); + io::BufferReader buffer_reader(message->body()); + return ReadSparseTensor(*message->metadata(), &buffer_reader, out); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 942664d..ebecea1 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -33,6 +33,7 @@ class Buffer; class Schema; class Status; class Tensor; +class SparseTensor; namespace io { @@ -235,6 +236,22 @@ Status ReadTensor(io::InputStream* file, std::shared_ptr<Tensor>* out); ARROW_EXPORT Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out); +/// \brief EXPERIMETNAL: Read arrow::SparseTensor as encapsulated IPC message in file +/// +/// \param[in] file an InputStream pointed at the start of the message +/// \param[out] out the read sparse tensor +/// \return Status +ARROW_EXPORT +Status ReadSparseTensor(io::InputStream* file, std::shared_ptr<SparseTensor>* out); + +/// \brief EXPERIMENTAL: Read arrow::SparseTensor from IPC message +/// +/// \param[in] message a Message containing the tensor metadata and body +/// \param[out] out the read sparse tensor +/// \return Status +ARROW_EXPORT +Status ReadSparseTensor(const Message& message, std::shared_ptr<SparseTensor>* out); + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 6ce72e0..0bf6814 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -21,6 +21,7 @@ #include <cstdint> #include <cstring> #include <limits> +#include <sstream> #include <vector> #include "arrow/array.h" @@ -33,6 +34,7 @@ #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/tensor.h" @@ -671,6 +673,105 @@ Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, return Status::OK(); } +namespace internal { + +class SparseTensorSerializer { + public: + SparseTensorSerializer(int64_t buffer_start_offset, IpcPayload* out) + : out_(out), buffer_start_offset_(buffer_start_offset) {} + + ~SparseTensorSerializer() = default; + + Status VisitSparseIndex(const SparseIndex& sparse_index) { + switch (sparse_index.format_id()) { + case SparseTensorFormat::COO: + RETURN_NOT_OK( + VisitSparseCOOIndex(checked_cast<const SparseCOOIndex&>(sparse_index))); + break; + + case SparseTensorFormat::CSR: + RETURN_NOT_OK( + VisitSparseCSRIndex(checked_cast<const SparseCSRIndex&>(sparse_index))); + break; + + default: + std::stringstream ss; + ss << "Unable to convert type: " << sparse_index.ToString() << std::endl; + return Status::NotImplemented(ss.str()); + } + + return Status::OK(); + } + + Status SerializeMetadata(const SparseTensor& sparse_tensor) { + return WriteSparseTensorMessage(sparse_tensor, out_->body_length, buffer_meta_, + &out_->metadata); + } + + Status Assemble(const SparseTensor& sparse_tensor) { + if (buffer_meta_.size() > 0) { + buffer_meta_.clear(); + out_->body_buffers.clear(); + } + + RETURN_NOT_OK(VisitSparseIndex(*sparse_tensor.sparse_index())); + out_->body_buffers.emplace_back(sparse_tensor.data()); + + int64_t offset = buffer_start_offset_; + buffer_meta_.reserve(out_->body_buffers.size()); + + for (size_t i = 0; i < out_->body_buffers.size(); ++i) { + const Buffer* buffer = out_->body_buffers[i].get(); + int64_t size = buffer->size(); + int64_t padding = BitUtil::RoundUpToMultipleOf8(size) - size; + buffer_meta_.push_back({offset, size + padding}); + offset += size + padding; + } + + out_->body_length = offset - buffer_start_offset_; + DCHECK(BitUtil::IsMultipleOf8(out_->body_length)); + + return SerializeMetadata(sparse_tensor); + } + + private: + Status VisitSparseCOOIndex(const SparseCOOIndex& sparse_index) { + out_->body_buffers.emplace_back(sparse_index.indices()->data()); + return Status::OK(); + } + + Status VisitSparseCSRIndex(const SparseCSRIndex& sparse_index) { + out_->body_buffers.emplace_back(sparse_index.indptr()->data()); + out_->body_buffers.emplace_back(sparse_index.indices()->data()); + return Status::OK(); + } + + IpcPayload* out_; + + std::vector<internal::BufferMetadata> buffer_meta_; + + int64_t buffer_start_offset_; +}; + +Status GetSparseTensorPayload(const SparseTensor& sparse_tensor, MemoryPool* pool, + IpcPayload* out) { + SparseTensorSerializer writer(0, out); + return writer.Assemble(sparse_tensor); +} + +} // namespace internal + +Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool) { + internal::IpcPayload payload; + internal::SparseTensorSerializer writer(0, &payload); + RETURN_NOT_OK(writer.Assemble(sparse_tensor)); + + *body_length = payload.body_length; + return internal::WriteIpcPayload(payload, dst, metadata_length); +} + Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index a1c7111..5feb9e9 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -36,6 +36,7 @@ class Schema; class Status; class Table; class Tensor; +class SparseTensor; namespace io { @@ -269,6 +270,20 @@ ARROW_EXPORT Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length); +// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous mesasge. The metadata, +// sparse index, and body are written assuming 64-byte alignment. It is the +// user's responsibility to ensure that the OutputStream has been aligned +// to a 64-byte multiple before writing the message. +// +// \param[in] tensor the SparseTensor to write +// \param[in] dst the OutputStream to write to +// \param[out] metadata_length the actual metadata length, including padding +// \param[out] body_length the actual message body length +ARROW_EXPORT +Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool); + namespace internal { // These internal APIs may change without warning or deprecation diff --git a/cpp/src/arrow/sparse_tensor-test.cc b/cpp/src/arrow/sparse_tensor-test.cc new file mode 100644 index 0000000..d48f2d0 --- /dev/null +++ b/cpp/src/arrow/sparse_tensor-test.cc @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Unit tests for DataType (and subclasses), Field, and Schema + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include <iostream> + +#include <gtest/gtest.h> + +#include "arrow/sparse_tensor.h" +#include "arrow/test-util.h" +#include "arrow/type.h" + +namespace arrow { + +static inline void CheckSparseIndexFormatType(SparseTensorFormat::type expected, + const SparseTensor& sparse_tensor) { + ASSERT_EQ(expected, sparse_tensor.format_id()); + ASSERT_EQ(expected, sparse_tensor.sparse_index()->format_id()); +} + +TEST(TestSparseCOOTensor, CreationEmptyTensor) { + std::vector<int64_t> shape = {2, 3, 4}; + SparseTensorImpl<SparseCOOIndex> st1(int64(), shape); + + std::vector<std::string> dim_names = {"foo", "bar", "baz"}; + SparseTensorImpl<SparseCOOIndex> st2(int64(), shape, dim_names); + + ASSERT_EQ(0, st1.non_zero_length()); + ASSERT_EQ(0, st2.non_zero_length()); + + ASSERT_EQ(24, st1.size()); + ASSERT_EQ(24, st2.size()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); +} + +TEST(TestSparseCOOTensor, CreationFromNumericTensor) { + std::vector<int64_t> shape = {2, 3, 4}; + std::vector<int64_t> values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + std::shared_ptr<Buffer> buffer = Buffer::Wrap(values); + std::vector<std::string> dim_names = {"foo", "bar", "baz"}; + NumericTensor<Int64Type> tensor1(buffer, shape); + NumericTensor<Int64Type> tensor2(buffer, shape, {}, dim_names); + SparseTensorImpl<SparseCOOIndex> st1(tensor1); + SparseTensorImpl<SparseCOOIndex> st2(tensor2); + + CheckSparseIndexFormatType(SparseTensorFormat::COO, st1); + + ASSERT_EQ(12, st1.non_zero_length()); + ASSERT_TRUE(st1.is_mutable()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); + + const int64_t* ptr = reinterpret_cast<const int64_t*>(st1.raw_data()); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 1, ptr[i]); + } + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 11, ptr[i + 6]); + } + + const auto& si = internal::checked_cast<const SparseCOOIndex&>(*st1.sparse_index()); + ASSERT_EQ(std::string("SparseCOOIndex"), si.ToString()); + + std::shared_ptr<SparseCOOIndex::CoordsTensor> sidx = si.indices(); + ASSERT_EQ(std::vector<int64_t>({12, 3}), sidx->shape()); + ASSERT_TRUE(sidx->is_column_major()); + + // (0, 0, 0) -> 1 + ASSERT_EQ(0, sidx->Value({0, 0})); + ASSERT_EQ(0, sidx->Value({0, 1})); + ASSERT_EQ(0, sidx->Value({0, 2})); + + // (0, 0, 2) -> 2 + ASSERT_EQ(0, sidx->Value({1, 0})); + ASSERT_EQ(0, sidx->Value({1, 1})); + ASSERT_EQ(2, sidx->Value({1, 2})); + + // (0, 1, 1) -> 3 + ASSERT_EQ(0, sidx->Value({2, 0})); + ASSERT_EQ(1, sidx->Value({2, 1})); + ASSERT_EQ(1, sidx->Value({2, 2})); + + // (1, 2, 1) -> 15 + ASSERT_EQ(1, sidx->Value({10, 0})); + ASSERT_EQ(2, sidx->Value({10, 1})); + ASSERT_EQ(1, sidx->Value({10, 2})); + + // (1, 2, 3) -> 16 + ASSERT_EQ(1, sidx->Value({11, 0})); + ASSERT_EQ(2, sidx->Value({11, 1})); + ASSERT_EQ(3, sidx->Value({11, 2})); +} + +TEST(TestSparseCOOTensor, CreationFromTensor) { + std::vector<int64_t> shape = {2, 3, 4}; + std::vector<int64_t> values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + std::shared_ptr<Buffer> buffer = Buffer::Wrap(values); + std::vector<std::string> dim_names = {"foo", "bar", "baz"}; + Tensor tensor1(int64(), buffer, shape); + Tensor tensor2(int64(), buffer, shape, {}, dim_names); + SparseTensorImpl<SparseCOOIndex> st1(tensor1); + SparseTensorImpl<SparseCOOIndex> st2(tensor2); + + ASSERT_EQ(12, st1.non_zero_length()); + ASSERT_TRUE(st1.is_mutable()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); + + const int64_t* ptr = reinterpret_cast<const int64_t*>(st1.raw_data()); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 1, ptr[i]); + } + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 11, ptr[i + 6]); + } + + const auto& si = internal::checked_cast<const SparseCOOIndex&>(*st1.sparse_index()); + std::shared_ptr<SparseCOOIndex::CoordsTensor> sidx = si.indices(); + ASSERT_EQ(std::vector<int64_t>({12, 3}), sidx->shape()); + ASSERT_TRUE(sidx->is_column_major()); + + // (0, 0, 0) -> 1 + ASSERT_EQ(0, sidx->Value({0, 0})); + ASSERT_EQ(0, sidx->Value({0, 1})); + ASSERT_EQ(0, sidx->Value({0, 2})); + + // (0, 0, 2) -> 2 + ASSERT_EQ(0, sidx->Value({1, 0})); + ASSERT_EQ(0, sidx->Value({1, 1})); + ASSERT_EQ(2, sidx->Value({1, 2})); + + // (0, 1, 1) -> 3 + ASSERT_EQ(0, sidx->Value({2, 0})); + ASSERT_EQ(1, sidx->Value({2, 1})); + ASSERT_EQ(1, sidx->Value({2, 2})); + + // (1, 2, 1) -> 15 + ASSERT_EQ(1, sidx->Value({10, 0})); + ASSERT_EQ(2, sidx->Value({10, 1})); + ASSERT_EQ(1, sidx->Value({10, 2})); + + // (1, 2, 3) -> 16 + ASSERT_EQ(1, sidx->Value({11, 0})); + ASSERT_EQ(2, sidx->Value({11, 1})); + ASSERT_EQ(3, sidx->Value({11, 2})); +} + +TEST(TestSparseCSRMatrix, CreationFromNumericTensor2D) { + std::vector<int64_t> shape = {6, 4}; + std::vector<int64_t> values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + std::shared_ptr<Buffer> buffer = Buffer::Wrap(values); + std::vector<std::string> dim_names = {"foo", "bar", "baz"}; + NumericTensor<Int64Type> tensor1(buffer, shape); + NumericTensor<Int64Type> tensor2(buffer, shape, {}, dim_names); + + SparseTensorImpl<SparseCSRIndex> st1(tensor1); + SparseTensorImpl<SparseCSRIndex> st2(tensor2); + + CheckSparseIndexFormatType(SparseTensorFormat::CSR, st1); + + ASSERT_EQ(12, st1.non_zero_length()); + ASSERT_TRUE(st1.is_mutable()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); + + const int64_t* ptr = reinterpret_cast<const int64_t*>(st1.raw_data()); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 1, ptr[i]); + } + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 11, ptr[i + 6]); + } + + const auto& si = internal::checked_cast<const SparseCSRIndex&>(*st1.sparse_index()); + + ASSERT_EQ(std::string("SparseCSRIndex"), si.ToString()); + ASSERT_EQ(1, si.indptr()->ndim()); + ASSERT_EQ(1, si.indices()->ndim()); + + const int64_t* indptr_begin = reinterpret_cast<const int64_t*>(si.indptr()->raw_data()); + std::vector<int64_t> indptr_values(indptr_begin, + indptr_begin + si.indptr()->shape()[0]); + + ASSERT_EQ(7, indptr_values.size()); + ASSERT_EQ(std::vector<int64_t>({0, 2, 4, 6, 8, 10, 12}), indptr_values); + + const int64_t* indices_begin = + reinterpret_cast<const int64_t*>(si.indices()->raw_data()); + std::vector<int64_t> indices_values(indices_begin, + indices_begin + si.indices()->shape()[0]); + + ASSERT_EQ(12, indices_values.size()); + ASSERT_EQ(std::vector<int64_t>({0, 2, 1, 3, 0, 2, 1, 3, 0, 2, 1, 3}), indices_values); +} + +} // namespace arrow diff --git a/cpp/src/arrow/sparse_tensor.cc b/cpp/src/arrow/sparse_tensor.cc new file mode 100644 index 0000000..101500d --- /dev/null +++ b/cpp/src/arrow/sparse_tensor.cc @@ -0,0 +1,452 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/sparse_tensor.h" + +#include <functional> +#include <memory> +#include <numeric> + +#include "arrow/compare.h" +#include "arrow/util/logging.h" + +namespace arrow { + +namespace { + +// ---------------------------------------------------------------------- +// SparseTensorConverter + +template <typename TYPE, typename SparseIndexType> +class SparseTensorConverter { + public: + explicit SparseTensorConverter(const NumericTensor<TYPE>&) {} + + Status Convert() { return Status::Invalid("Unsupported sparse index"); } +}; + +// ---------------------------------------------------------------------- +// SparseTensorConverter for SparseCOOIndex + +template <typename TYPE> +struct SparseTensorConverterBase { + using NumericTensorType = NumericTensor<TYPE>; + using value_type = typename NumericTensorType::value_type; + + explicit SparseTensorConverterBase(const NumericTensorType& tensor) : tensor_(tensor) {} + + bool TensorIsTriviallyIterable() const { + return tensor_.ndim() <= 1 || tensor_.is_contiguous(); + } + + size_t CountNonZero() const { + if (tensor_.size() == 0) { + return 0; + } + + if (TensorIsTriviallyIterable()) { + const value_type* data = reinterpret_cast<const value_type*>(tensor_.raw_data()); + return std::count_if(data, data + tensor_.size(), + [](value_type x) { return x != 0; }); + } + + const std::vector<int64_t>& shape = tensor_.shape(); + const int64_t ndim = tensor_.ndim(); + + size_t count = 0; + std::vector<int64_t> coord(ndim, 0); + for (int64_t n = tensor_.size(); n > 0; n--) { + if (tensor_.Value(coord) != 0) { + ++count; + } + + // increment index + ++coord[ndim - 1]; + if (n > 1 && coord[ndim - 1] == shape[ndim - 1]) { + int64_t d = ndim - 1; + while (d > 0 && coord[d] == shape[d]) { + coord[d] = 0; + ++coord[d - 1]; + --d; + } + } + } + return count; + } + + const NumericTensorType& tensor_; +}; + +template <typename TYPE> +class SparseTensorConverter<TYPE, SparseCOOIndex> + : private SparseTensorConverterBase<TYPE> { + public: + using BaseClass = SparseTensorConverterBase<TYPE>; + using NumericTensorType = typename BaseClass::NumericTensorType; + using value_type = typename BaseClass::value_type; + + explicit SparseTensorConverter(const NumericTensorType& tensor) : BaseClass(tensor) {} + + Status Convert() { + const int64_t ndim = tensor_.ndim(); + const int64_t nonzero_count = static_cast<int64_t>(CountNonZero()); + + std::shared_ptr<Buffer> indices_buffer; + RETURN_NOT_OK( + AllocateBuffer(sizeof(int64_t) * ndim * nonzero_count, &indices_buffer)); + int64_t* indices = reinterpret_cast<int64_t*>(indices_buffer->mutable_data()); + + std::shared_ptr<Buffer> values_buffer; + RETURN_NOT_OK(AllocateBuffer(sizeof(value_type) * nonzero_count, &values_buffer)); + value_type* values = reinterpret_cast<value_type*>(values_buffer->mutable_data()); + + if (ndim <= 1) { + const value_type* data = reinterpret_cast<const value_type*>(tensor_.raw_data()); + const int64_t count = ndim == 0 ? 1 : tensor_.shape()[0]; + for (int64_t i = 0; i < count; ++i, ++data) { + if (*data != 0) { + *indices++ = i; + *values++ = *data; + } + } + } else { + const std::vector<int64_t>& shape = tensor_.shape(); + std::vector<int64_t> coord(ndim, 0); + + for (int64_t n = tensor_.size(); n > 0; n--) { + const value_type x = tensor_.Value(coord); + if (tensor_.Value(coord) != 0) { + *values++ = x; + + int64_t* indp = indices; + for (int64_t i = 0; i < ndim; ++i) { + *indp = coord[i]; + indp += nonzero_count; + } + indices++; + } + + // increment index + ++coord[ndim - 1]; + if (n > 1 && coord[ndim - 1] == shape[ndim - 1]) { + int64_t d = ndim - 1; + while (d > 0 && coord[d] == shape[d]) { + coord[d] = 0; + ++coord[d - 1]; + --d; + } + } + } + } + + // make results + const std::vector<int64_t> indices_shape = {nonzero_count, ndim}; + const int64_t indices_elsize = sizeof(int64_t); + const std::vector<int64_t> indices_strides = {indices_elsize, + indices_elsize * nonzero_count}; + sparse_index = + std::make_shared<SparseCOOIndex>(std::make_shared<SparseCOOIndex::CoordsTensor>( + indices_buffer, indices_shape, indices_strides)); + data = values_buffer; + + return Status::OK(); + } + + std::shared_ptr<SparseCOOIndex> sparse_index; + std::shared_ptr<Buffer> data; + + private: + using SparseTensorConverterBase<TYPE>::tensor_; + using SparseTensorConverterBase<TYPE>::CountNonZero; +}; + +template <typename TYPE, typename SparseIndexType> +void MakeSparseTensorFromTensor(const Tensor& tensor, + std::shared_ptr<SparseIndex>* sparse_index, + std::shared_ptr<Buffer>* data) { + NumericTensor<TYPE> numeric_tensor(tensor.data(), tensor.shape(), tensor.strides()); + SparseTensorConverter<TYPE, SparseIndexType> converter(numeric_tensor); + DCHECK_OK(converter.Convert()); + *sparse_index = converter.sparse_index; + *data = converter.data; +} + +// ---------------------------------------------------------------------- +// SparseTensorConverter for SparseCSRIndex + +template <typename TYPE> +class SparseTensorConverter<TYPE, SparseCSRIndex> + : private SparseTensorConverterBase<TYPE> { + public: + using BaseClass = SparseTensorConverterBase<TYPE>; + using NumericTensorType = typename BaseClass::NumericTensorType; + using value_type = typename BaseClass::value_type; + + explicit SparseTensorConverter(const NumericTensorType& tensor) : BaseClass(tensor) {} + + Status Convert() { + const int64_t ndim = tensor_.ndim(); + if (ndim > 2) { + return Status::Invalid("Invalid tensor dimension"); + } + + const int64_t nr = tensor_.shape()[0]; + const int64_t nc = tensor_.shape()[1]; + const int64_t nonzero_count = static_cast<int64_t>(CountNonZero()); + + std::shared_ptr<Buffer> indptr_buffer; + std::shared_ptr<Buffer> indices_buffer; + + std::shared_ptr<Buffer> values_buffer; + RETURN_NOT_OK(AllocateBuffer(sizeof(value_type) * nonzero_count, &values_buffer)); + value_type* values = reinterpret_cast<value_type*>(values_buffer->mutable_data()); + + if (ndim <= 1) { + return Status::NotImplemented("TODO for ndim <= 1"); + } else { + RETURN_NOT_OK(AllocateBuffer(sizeof(int64_t) * (nr + 1), &indptr_buffer)); + int64_t* indptr = reinterpret_cast<int64_t*>(indptr_buffer->mutable_data()); + + RETURN_NOT_OK(AllocateBuffer(sizeof(int64_t) * nonzero_count, &indices_buffer)); + int64_t* indices = reinterpret_cast<int64_t*>(indices_buffer->mutable_data()); + + int64_t k = 0; + *indptr++ = 0; + for (int64_t i = 0; i < nr; ++i) { + for (int64_t j = 0; j < nc; ++j) { + const value_type x = tensor_.Value({i, j}); + if (x != 0) { + *values++ = x; + *indices++ = j; + k++; + } + } + *indptr++ = k; + } + } + + std::vector<int64_t> indptr_shape({nr + 1}); + std::shared_ptr<SparseCSRIndex::IndexTensor> indptr_tensor = + std::make_shared<SparseCSRIndex::IndexTensor>(indptr_buffer, indptr_shape); + + std::vector<int64_t> indices_shape({nonzero_count}); + std::shared_ptr<SparseCSRIndex::IndexTensor> indices_tensor = + std::make_shared<SparseCSRIndex::IndexTensor>(indices_buffer, indices_shape); + + sparse_index = std::make_shared<SparseCSRIndex>(indptr_tensor, indices_tensor); + data = values_buffer; + + return Status::OK(); + } + + std::shared_ptr<SparseCSRIndex> sparse_index; + std::shared_ptr<Buffer> data; + + private: + using BaseClass::tensor_; + using SparseTensorConverterBase<TYPE>::CountNonZero; +}; + +// ---------------------------------------------------------------------- +// Instantiate templates + +#define INSTANTIATE_SPARSE_TENSOR_CONVERTER(IndexType) \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<UInt8Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<UInt16Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<UInt32Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<UInt64Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<Int8Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<Int16Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<Int32Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<Int64Type, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<HalfFloatType, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<FloatType, IndexType>; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter<DoubleType, IndexType> + +INSTANTIATE_SPARSE_TENSOR_CONVERTER(SparseCOOIndex); +INSTANTIATE_SPARSE_TENSOR_CONVERTER(SparseCSRIndex); + +} // namespace + +// ---------------------------------------------------------------------- +// SparseCOOIndex + +// Constructor with a column-major NumericTensor +SparseCOOIndex::SparseCOOIndex(const std::shared_ptr<CoordsTensor>& coords) + : SparseIndexBase(coords->shape()[0]), coords_(coords) { + DCHECK(coords_->is_column_major()); +} + +std::string SparseCOOIndex::ToString() const { return std::string("SparseCOOIndex"); } + +// ---------------------------------------------------------------------- +// SparseCSRIndex + +// Constructor with two index vectors +SparseCSRIndex::SparseCSRIndex(const std::shared_ptr<IndexTensor>& indptr, + const std::shared_ptr<IndexTensor>& indices) + : SparseIndexBase(indices->shape()[0]), indptr_(indptr), indices_(indices) { + DCHECK_EQ(1, indptr_->ndim()); + DCHECK_EQ(1, indices_->ndim()); +} + +std::string SparseCSRIndex::ToString() const { return std::string("SparseCSRIndex"); } + +// ---------------------------------------------------------------------- +// SparseTensor + +// Constructor with all attributes +SparseTensor::SparseTensor(const std::shared_ptr<DataType>& type, + const std::shared_ptr<Buffer>& data, + const std::vector<int64_t>& shape, + const std::shared_ptr<SparseIndex>& sparse_index, + const std::vector<std::string>& dim_names) + : type_(type), + data_(data), + shape_(shape), + sparse_index_(sparse_index), + dim_names_(dim_names) { + DCHECK(is_tensor_supported(type->id())); +} + +const std::string& SparseTensor::dim_name(int i) const { + static const std::string kEmpty = ""; + if (dim_names_.size() == 0) { + return kEmpty; + } else { + DCHECK_LT(i, static_cast<int>(dim_names_.size())); + return dim_names_[i]; + } +} + +int64_t SparseTensor::size() const { + return std::accumulate(shape_.begin(), shape_.end(), 1LL, std::multiplies<int64_t>()); +} + +bool SparseTensor::Equals(const SparseTensor& other) const { + return SparseTensorEquals(*this, other); +} + +// ---------------------------------------------------------------------- +// SparseTensorImpl + +// Constructor with a dense tensor +template <typename SparseIndexType> +SparseTensorImpl<SparseIndexType>::SparseTensorImpl( + const std::shared_ptr<DataType>& type, const std::vector<int64_t>& shape, + const std::vector<std::string>& dim_names) + : SparseTensorImpl(nullptr, type, nullptr, shape, dim_names) {} + +// Constructor with a dense tensor +template <typename SparseIndexType> +template <typename TYPE> +SparseTensorImpl<SparseIndexType>::SparseTensorImpl(const NumericTensor<TYPE>& tensor) + : SparseTensorImpl(nullptr, tensor.type(), nullptr, tensor.shape(), + tensor.dim_names_) { + SparseTensorConverter<TYPE, SparseIndexType> converter(tensor); + DCHECK_OK(converter.Convert()); + sparse_index_ = converter.sparse_index; + data_ = converter.data; +} + +// Constructor with a dense tensor +template <typename SparseIndexType> +SparseTensorImpl<SparseIndexType>::SparseTensorImpl(const Tensor& tensor) + : SparseTensorImpl(nullptr, tensor.type(), nullptr, tensor.shape(), + tensor.dim_names_) { + switch (tensor.type()->id()) { + case Type::UINT8: + MakeSparseTensorFromTensor<UInt8Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::INT8: + MakeSparseTensorFromTensor<Int8Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::UINT16: + MakeSparseTensorFromTensor<UInt16Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::INT16: + MakeSparseTensorFromTensor<Int16Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::UINT32: + MakeSparseTensorFromTensor<UInt32Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::INT32: + MakeSparseTensorFromTensor<Int32Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::UINT64: + MakeSparseTensorFromTensor<UInt64Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::INT64: + MakeSparseTensorFromTensor<Int64Type, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::HALF_FLOAT: + MakeSparseTensorFromTensor<HalfFloatType, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::FLOAT: + MakeSparseTensorFromTensor<FloatType, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + case Type::DOUBLE: + MakeSparseTensorFromTensor<DoubleType, SparseIndexType>(tensor, &sparse_index_, + &data_); + return; + default: + break; + } +} + +// ---------------------------------------------------------------------- +// Instantiate templates + +#define INSTANTIATE_SPARSE_TENSOR(IndexType) \ + template class ARROW_TEMPLATE_EXPORT SparseTensorImpl<IndexType>; \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<UInt8Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<UInt16Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<UInt32Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<UInt64Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<Int8Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<Int16Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<Int32Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<Int64Type>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<HalfFloatType>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<FloatType>&); \ + template ARROW_EXPORT SparseTensorImpl<IndexType>::SparseTensorImpl( \ + const NumericTensor<DoubleType>&) + +INSTANTIATE_SPARSE_TENSOR(SparseCOOIndex); +INSTANTIATE_SPARSE_TENSOR(SparseCSRIndex); + +} // namespace arrow diff --git a/cpp/src/arrow/sparse_tensor.h b/cpp/src/arrow/sparse_tensor.h new file mode 100644 index 0000000..c7693d2 --- /dev/null +++ b/cpp/src/arrow/sparse_tensor.h @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_SPARSE_TENSOR_H +#define ARROW_SPARSE_TENSOR_H + +#include <memory> +#include <string> +#include <vector> + +#include "arrow/tensor.h" + +namespace arrow { + +// ---------------------------------------------------------------------- +// SparseIndex class + +/// \brief EXPERIMENTAL: Sparse tensor format enumeration +struct SparseTensorFormat { + enum type { COO, CSR }; +}; + +/// \brief EXPERIMENTAL: The base class for representing index of non-zero +/// values in sparse tensor +class ARROW_EXPORT SparseIndex { + public: + explicit SparseIndex(SparseTensorFormat::type format_id, int64_t non_zero_length) + : format_id_(format_id), non_zero_length_(non_zero_length) {} + + virtual ~SparseIndex() = default; + + SparseTensorFormat::type format_id() const { return format_id_; } + int64_t non_zero_length() const { return non_zero_length_; } + + virtual std::string ToString() const = 0; + + protected: + SparseTensorFormat::type format_id_; + int64_t non_zero_length_; +}; + +template <typename SparseIndexType> +class SparseIndexBase : public SparseIndex { + public: + explicit SparseIndexBase(int64_t non_zero_length) + : SparseIndex(SparseIndexType::format_id, non_zero_length) {} +}; + +// ---------------------------------------------------------------------- +// SparseCOOIndex class + +/// \brief EXPERIMENTAL: The index data for COO sparse tensor +class ARROW_EXPORT SparseCOOIndex : public SparseIndexBase<SparseCOOIndex> { + public: + using CoordsTensor = NumericTensor<Int64Type>; + + static constexpr SparseTensorFormat::type format_id = SparseTensorFormat::COO; + + // Constructor with a column-major NumericTensor + explicit SparseCOOIndex(const std::shared_ptr<CoordsTensor>& coords); + + const std::shared_ptr<CoordsTensor>& indices() const { return coords_; } + + std::string ToString() const override; + + bool Equals(const SparseCOOIndex& other) const { + return indices()->Equals(*other.indices()); + } + + protected: + std::shared_ptr<CoordsTensor> coords_; +}; + +// ---------------------------------------------------------------------- +// SparseCSRIndex class + +/// \brief EXPERIMENTAL: The index data for CSR sparse matrix +class ARROW_EXPORT SparseCSRIndex : public SparseIndexBase<SparseCSRIndex> { + public: + using IndexTensor = NumericTensor<Int64Type>; + + static constexpr SparseTensorFormat::type format_id = SparseTensorFormat::CSR; + + // Constructor with two index vectors + explicit SparseCSRIndex(const std::shared_ptr<IndexTensor>& indptr, + const std::shared_ptr<IndexTensor>& indices); + + const std::shared_ptr<IndexTensor>& indptr() const { return indptr_; } + const std::shared_ptr<IndexTensor>& indices() const { return indices_; } + + std::string ToString() const override; + + bool Equals(const SparseCSRIndex& other) const { + return indptr()->Equals(*other.indptr()) && indices()->Equals(*other.indices()); + } + + protected: + std::shared_ptr<IndexTensor> indptr_; + std::shared_ptr<IndexTensor> indices_; +}; + +// ---------------------------------------------------------------------- +// SparseTensor class + +/// \brief EXPERIMENTAL: The base class of sparse tensor container +class ARROW_EXPORT SparseTensor { + public: + virtual ~SparseTensor() = default; + + SparseTensorFormat::type format_id() const { return sparse_index_->format_id(); } + + std::shared_ptr<DataType> type() const { return type_; } + std::shared_ptr<Buffer> data() const { return data_; } + + const uint8_t* raw_data() const { return data_->data(); } + uint8_t* raw_mutable_data() const { return data_->mutable_data(); } + + const std::vector<int64_t>& shape() const { return shape_; } + + const std::shared_ptr<SparseIndex>& sparse_index() const { return sparse_index_; } + + int ndim() const { return static_cast<int>(shape_.size()); } + + const std::string& dim_name(int i) const; + + /// Total number of value cells in the sparse tensor + int64_t size() const; + + /// Return true if the underlying data buffer is mutable + bool is_mutable() const { return data_->is_mutable(); } + + /// Total number of non-zero cells in the sparse tensor + int64_t non_zero_length() const { + return sparse_index_ ? sparse_index_->non_zero_length() : 0; + } + + bool Equals(const SparseTensor& other) const; + + protected: + // Constructor with all attributes + SparseTensor(const std::shared_ptr<DataType>& type, const std::shared_ptr<Buffer>& data, + const std::vector<int64_t>& shape, + const std::shared_ptr<SparseIndex>& sparse_index, + const std::vector<std::string>& dim_names); + + std::shared_ptr<DataType> type_; + std::shared_ptr<Buffer> data_; + std::vector<int64_t> shape_; + std::shared_ptr<SparseIndex> sparse_index_; + + /// These names are optional + std::vector<std::string> dim_names_; +}; + +// ---------------------------------------------------------------------- +// SparseTensorImpl class + +/// \brief EXPERIMENTAL: Concrete sparse tensor implementation classes with sparse index +/// type +template <typename SparseIndexType> +class ARROW_EXPORT SparseTensorImpl : public SparseTensor { + public: + virtual ~SparseTensorImpl() = default; + + // Constructor with all attributes + SparseTensorImpl(const std::shared_ptr<SparseIndexType>& sparse_index, + const std::shared_ptr<DataType>& type, + const std::shared_ptr<Buffer>& data, const std::vector<int64_t>& shape, + const std::vector<std::string>& dim_names) + : SparseTensor(type, data, shape, sparse_index, dim_names) {} + + // Constructor for empty sparse tensor + SparseTensorImpl(const std::shared_ptr<DataType>& type, + const std::vector<int64_t>& shape, + const std::vector<std::string>& dim_names = {}); + + // Constructor with a dense numeric tensor + template <typename TYPE> + explicit SparseTensorImpl(const NumericTensor<TYPE>& tensor); + + // Constructor with a dense tensor + explicit SparseTensorImpl(const Tensor& tensor); + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(SparseTensorImpl); +}; + +/// \brief EXPERIMENTAL: Type alias for COO sparse tensor +using SparseTensorCOO = SparseTensorImpl<SparseCOOIndex>; + +/// \brief EXPERIMENTAL: Type alias for CSR sparse matrix +using SparseTensorCSR = SparseTensorImpl<SparseCSRIndex>; +using SparseMatrixCSR = SparseTensorImpl<SparseCSRIndex>; + +} // namespace arrow + +#endif // ARROW_SPARSE_TENSOR_H diff --git a/cpp/src/arrow/tensor.h b/cpp/src/arrow/tensor.h index a9b5df8..e81f0f0 100644 --- a/cpp/src/arrow/tensor.h +++ b/cpp/src/arrow/tensor.h @@ -50,6 +50,9 @@ static inline bool is_tensor_supported(Type::type type_id) { return false; } +template <typename SparseIndexType> +class SparseTensorImpl; + class ARROW_EXPORT Tensor { public: virtual ~Tensor() = default; @@ -110,6 +113,9 @@ class ARROW_EXPORT Tensor { /// These names are optional std::vector<std::string> dim_names_; + template <typename SparseIndexType> + friend class SparseTensorImpl; + private: ARROW_DISALLOW_COPY_AND_ASSIGN(Tensor); }; diff --git a/docs/source/format/IPC.rst b/docs/source/format/IPC.rst index 8cb74b8..62a1237 100644 --- a/docs/source/format/IPC.rst +++ b/docs/source/format/IPC.rst @@ -234,4 +234,28 @@ region) to be multiples of 64 bytes: :: <metadata> <tensor body> +SparseTensor Message Format +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``SparseTensor`` message types provides another way to write a +multidimensional array of fixed-size values using Arrow's shared memory tools +in addition to ``Tensor``. ``SparseTensor`` is designed specifically for tensors +whose elements are almost zeros. Arrow implementations in general are not +required to implement this data format likewise ``Tensor``. + +When writing a standalone encapsulated sparse tensor message, we use the format as +indicated above, but additionally align the starting offset of the metadata as +well as the starting offsets of the sparse index and the sparse tensor body +(if writing to a shared memory region) to be multiples of 64 bytes: + + <PADDING> + <metadata size: int32> + <metadata> + <sparse index> + <PADDING> + <sparse tensor body> + +The contents of the sparse tensor index is depends on what kinds of sparse +format is used. + .. _Flatbuffer: https://github.com/google/flatbuffers diff --git a/format/Message.fbs b/format/Message.fbs index 8307181..e14fdca 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -87,7 +87,7 @@ table DictionaryBatch { /// which may include experimental metadata types. For maximum compatibility, /// it is best to send data using RecordBatch union MessageHeader { - Schema, DictionaryBatch, RecordBatch, Tensor + Schema, DictionaryBatch, RecordBatch, Tensor, SparseTensor } table Message { @@ -96,4 +96,4 @@ table Message { bodyLength: long; } -root_type Message; \ No newline at end of file +root_type Message; diff --git a/format/Tensor.fbs b/format/Tensor.fbs index 18b614c..e77b353 100644 --- a/format/Tensor.fbs +++ b/format/Tensor.fbs @@ -23,6 +23,9 @@ include "Schema.fbs"; namespace org.apache.arrow.flatbuf; +/// ---------------------------------------------------------------------- +/// Data structures for dense tensors + /// Shape data for a single axis in a tensor table TensorDim { /// Length of dimension @@ -48,3 +51,96 @@ table Tensor { } root_type Tensor; + +/// ---------------------------------------------------------------------- +/// EXPERIMENTAL: Data structures for sparse tensors + +/// Coodinate format of sparse tensor index. +table SparseTensorIndexCOO { + /// COO's index list are represented as a NxM matrix, + /// where N is the number of non-zero values, + /// and M is the number of dimensions of a sparse tensor. + /// indicesBuffer stores the location and size of this index matrix. + /// The type of index value is long, so the stride for the index matrix is unnecessary. + /// + /// For example, let X be a 2x3x4x5 tensor, and it has the following 6 non-zero values: + /// + /// X[0, 1, 2, 0] := 1 + /// X[1, 1, 2, 3] := 2 + /// X[0, 2, 1, 0] := 3 + /// X[0, 1, 3, 0] := 4 + /// X[0, 1, 2, 1] := 5 + /// X[1, 2, 0, 4] := 6 + /// + /// In COO format, the index matrix of X is the following 4x6 matrix: + /// + /// [[0, 0, 0, 0, 1, 1], + /// [1, 1, 1, 2, 1, 2], + /// [2, 2, 3, 1, 2, 0], + /// [0, 1, 0, 0, 3, 4]] + /// + /// Note that the indices are sorted in lexcographical order. + indicesBuffer: Buffer; +} + +/// Compressed Sparse Row format, that is matrix-specific. +table SparseMatrixIndexCSR { + /// indptrBuffer stores the location and size of indptr array that + /// represents the range of the rows. + /// The i-th row spans from indptr[i] to indptr[i+1] in the data. + /// The length of this array is 1 + (the number of rows), and the type + /// of index value is long. + /// + /// For example, let X be the following 6x4 matrix: + /// + /// X := [[0, 1, 2, 0], + /// [0, 0, 3, 0], + /// [0, 4, 0, 5], + /// [0, 0, 0, 0], + /// [6, 0, 7, 8], + /// [0, 9, 0, 0]]. + /// + /// The array of non-zero values in X is: + /// + /// values(X) = [1, 2, 3, 4, 5, 6, 7, 8, 9]. + /// + /// And the indptr of X is: + /// + /// indptr(X) = [0, 2, 3, 5, 5, 8, 10]. + indptrBuffer: Buffer; + + /// indicesBuffer stores the location and size of the array that + /// contains the column indices of the corresponding non-zero values. + /// The type of index value is long. + /// + /// For example, the indices of the above X is: + /// + /// indices(X) = [1, 2, 2, 1, 3, 0, 2, 3, 1]. + indicesBuffer: Buffer; +} + +union SparseTensorIndex { + SparseTensorIndexCOO, + SparseMatrixIndexCSR +} + +table SparseTensor { + /// The type of data contained in a value cell. + /// Currently only fixed-width value types are supported, + /// no strings or nested types. + type: Type; + + /// The dimensions of the tensor, optionally named. + shape: [TensorDim]; + + /// The number of non-zero values in a sparse tensor. + non_zero_length: long; + + /// Sparse tensor index + sparseIndex: SparseTensorIndex; + + /// The location and size of the tensor's data + data: Buffer; +} + +root_type SparseTensor;