This is an automated email from the ASF dual-hosted git repository. apitrou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 93688e8 ARROW-5977: [C++] [Python] Allow specifying which columns to include 93688e8 is described below commit 93688e8c1fa2f22d46394c548a9edbd3d2d7c62d Author: Antoine Pitrou <anto...@python.org> AuthorDate: Mon Aug 12 14:05:54 2019 +0200 ARROW-5977: [C++] [Python] Allow specifying which columns to include If the `include_columns` option is not empty: - columns are included in that order - columns not in `include_columns` are ignored - columns in `include_columns` but not in the CSV file produce a null column Closes #5026 from pitrou/ARROW-5977-csv-include-columns and squashes the following commits: 0f198ef0b <Antoine Pitrou> ARROW-5977: Allow specifying which columns to include Authored-by: Antoine Pitrou <anto...@python.org> Signed-off-by: Antoine Pitrou <anto...@python.org> --- cpp/src/arrow/csv/column-builder-test.cc | 135 ++++++++++++++++++++++++++----- cpp/src/arrow/csv/column-builder.cc | 92 +++++++++++++++++++-- cpp/src/arrow/csv/column-builder.h | 12 ++- cpp/src/arrow/csv/options.h | 13 +++ cpp/src/arrow/csv/parser.cc | 6 +- cpp/src/arrow/csv/reader.cc | 118 ++++++++++++++++++++------- cpp/src/arrow/csv/reader.h | 1 - cpp/src/arrow/table.cc | 15 +++- python/pyarrow/_csv.pyx | 54 ++++++++++++- python/pyarrow/error.pxi | 4 +- python/pyarrow/includes/libarrow.pxd | 2 + python/pyarrow/table.pxi | 7 ++ python/pyarrow/tests/test_csv.py | 104 +++++++++++++++++++++++- python/pyarrow/tests/test_table.py | 4 + 14 files changed, 498 insertions(+), 69 deletions(-) diff --git a/cpp/src/arrow/csv/column-builder-test.cc b/cpp/src/arrow/csv/column-builder-test.cc index f2c39aa..62e8d06 100644 --- a/cpp/src/arrow/csv/column-builder-test.cc +++ b/cpp/src/arrow/csv/column-builder-test.cc @@ -24,6 +24,7 @@ #include "arrow/csv/column-builder.h" #include "arrow/csv/options.h" #include "arrow/csv/test-common.h" +#include "arrow/memory_pool.h" #include "arrow/table.h" #include "arrow/testing/util.h" #include "arrow/type.h" @@ -48,6 +49,74 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder, } ASSERT_OK(builder->task_group()->Finish()); ASSERT_OK(builder->Finish(out)); + ASSERT_OK((*out)->Validate()); +} + +////////////////////////////////////////////////////////////////////////// +// Tests for null column builder + +TEST(NullColumnBuilder, Empty) { + std::shared_ptr<DataType> type = null(); + auto tg = TaskGroup::MakeSerial(); + + std::shared_ptr<ColumnBuilder> builder; + ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder)); + + std::shared_ptr<ChunkedArray> actual; + AssertBuilding(builder, {}, &actual); + + ChunkedArray expected({}, type); + AssertChunkedEqual(*actual, expected); +} + +TEST(NullColumnBuilder, InsertNull) { + // Bulding a column of nulls with type null() + std::shared_ptr<DataType> type = null(); + auto tg = TaskGroup::MakeSerial(); + + std::shared_ptr<ColumnBuilder> builder; + ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder)); + + std::shared_ptr<BlockParser> parser; + std::shared_ptr<ChunkedArray> actual, expected; + // Those values are indifferent, only the number of rows is used + MakeColumnParser({"456", "789"}, &parser); + builder->Insert(1, parser); + MakeColumnParser({"123"}, &parser); + builder->Insert(0, parser); + ASSERT_OK(builder->task_group()->Finish()); + ASSERT_OK(builder->Finish(&actual)); + ASSERT_OK(actual->Validate()); + + auto chunks = + ArrayVector{std::make_shared<NullArray>(1), std::make_shared<NullArray>(2)}; + expected = std::make_shared<ChunkedArray>(chunks); + AssertChunkedEqual(*actual, *expected); +} + +TEST(NullColumnBuilder, InsertTyped) { + // Bulding a column of nulls with another type + std::shared_ptr<DataType> type = int16(); + auto tg = TaskGroup::MakeSerial(); + + std::shared_ptr<ColumnBuilder> builder; + ASSERT_OK(ColumnBuilder::MakeNull(default_memory_pool(), type, tg, &builder)); + + std::shared_ptr<BlockParser> parser; + std::shared_ptr<ChunkedArray> actual, expected; + // Those values are indifferent, only the number of rows is used + MakeColumnParser({"abc", "def", "ghi"}, &parser); + builder->Insert(1, parser); + MakeColumnParser({"jkl"}, &parser); + builder->Insert(0, parser); + ASSERT_OK(builder->task_group()->Finish()); + ASSERT_OK(builder->Finish(&actual)); + ASSERT_OK(actual->Validate()); + + auto chunks = ArrayVector{ArrayFromJSON(type, "[null]"), + ArrayFromJSON(type, "[null, null, null]")}; + expected = std::make_shared<ChunkedArray>(chunks); + AssertChunkedEqual(*actual, *expected); } ////////////////////////////////////////////////////////////////////////// @@ -56,7 +125,8 @@ void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder, TEST(ColumnBuilder, Empty) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, + ConvertOptions::Defaults(), tg, &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {}, &actual); @@ -68,7 +138,8 @@ TEST(ColumnBuilder, Empty) { TEST(ColumnBuilder, Basics) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, + ConvertOptions::Defaults(), tg, &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"123", "-456"}}, &actual); @@ -82,7 +153,8 @@ TEST(ColumnBuilder, Insert) { // Test ColumnBuilder::Insert() auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, + ConvertOptions::Defaults(), tg, &builder)); std::shared_ptr<BlockParser> parser; std::shared_ptr<ChunkedArray> actual, expected; @@ -92,6 +164,7 @@ TEST(ColumnBuilder, Insert) { builder->Insert(0, parser); ASSERT_OK(builder->task_group()->Finish()); ASSERT_OK(builder->Finish(&actual)); + ASSERT_OK(actual->Validate()); ChunkedArrayFromVector<Int32Type>({{123}, {456}}, &expected); AssertChunkedEqual(*actual, *expected); @@ -100,7 +173,8 @@ TEST(ColumnBuilder, Insert) { TEST(ColumnBuilder, MultipleChunks) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, + ConvertOptions::Defaults(), tg, &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"1", "2", "3"}, {"4", "5"}}, &actual); @@ -113,7 +187,8 @@ TEST(ColumnBuilder, MultipleChunks) { TEST(ColumnBuilder, MultipleChunksParallel) { auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool()); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(int32(), 0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), int32(), 0, + ConvertOptions::Defaults(), tg, &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual); @@ -129,7 +204,8 @@ TEST(ColumnBuilder, MultipleChunksParallel) { TEST(InferringColumnBuilder, Empty) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {}, &actual); @@ -141,7 +217,8 @@ TEST(InferringColumnBuilder, Empty) { TEST(InferringColumnBuilder, SingleChunkNull) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"", "NA"}}, &actual); @@ -153,7 +230,8 @@ TEST(InferringColumnBuilder, SingleChunkNull) { TEST(InferringColumnBuilder, MultipleChunkNull) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"", "NA"}, {""}, {"NaN"}}, &actual); @@ -165,7 +243,8 @@ TEST(InferringColumnBuilder, MultipleChunkNull) { TEST(InferringColumnBuilder, SingleChunkInteger) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"", "123", "456"}}, &actual); @@ -178,7 +257,8 @@ TEST(InferringColumnBuilder, SingleChunkInteger) { TEST(InferringColumnBuilder, MultipleChunkInteger) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{""}, {"NA", "123", "456"}}, &actual); @@ -192,7 +272,8 @@ TEST(InferringColumnBuilder, MultipleChunkInteger) { TEST(InferringColumnBuilder, SingleChunkBoolean) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"", "0", "FALSE"}}, &actual); @@ -206,7 +287,8 @@ TEST(InferringColumnBuilder, SingleChunkBoolean) { TEST(InferringColumnBuilder, MultipleChunkBoolean) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{""}, {"1", "True", "0"}}, &actual); @@ -220,7 +302,8 @@ TEST(InferringColumnBuilder, MultipleChunkBoolean) { TEST(InferringColumnBuilder, SingleChunkReal) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"", "0.0", "12.5"}}, &actual); @@ -234,7 +317,8 @@ TEST(InferringColumnBuilder, SingleChunkReal) { TEST(InferringColumnBuilder, MultipleChunkReal) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{""}, {"008"}, {"NaN", "12.5"}}, &actual); @@ -248,7 +332,8 @@ TEST(InferringColumnBuilder, MultipleChunkReal) { TEST(InferringColumnBuilder, SingleChunkTimestamp) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, &actual); @@ -263,7 +348,8 @@ TEST(InferringColumnBuilder, SingleChunkTimestamp) { TEST(InferringColumnBuilder, MultipleChunkTimestamp) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, &actual); @@ -282,7 +368,8 @@ TEST(InferringColumnBuilder, SingleChunkString) { std::shared_ptr<ChunkedArray> expected; // With valid UTF8 - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); AssertBuilding(builder, {{"", "foo", "baré"}}, &actual); ChunkedArrayFromVector<StringType, std::string>({{true, true, true}}, @@ -293,7 +380,7 @@ TEST(InferringColumnBuilder, SingleChunkString) { auto options = ConvertOptions::Defaults(); options.check_utf8 = false; tg = TaskGroup::MakeSerial(); - ASSERT_OK(ColumnBuilder::Make(0, options, tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual); ChunkedArrayFromVector<StringType, std::string>({{true, true, true}}, @@ -308,7 +395,8 @@ TEST(InferringColumnBuilder, SingleChunkBinary) { std::shared_ptr<ChunkedArray> expected; // With invalid UTF8, checking - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual); ChunkedArrayFromVector<BinaryType, std::string>({{true, true, true}}, @@ -319,7 +407,8 @@ TEST(InferringColumnBuilder, SingleChunkBinary) { TEST(InferringColumnBuilder, MultipleChunkString) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré"}}, &actual); @@ -333,7 +422,8 @@ TEST(InferringColumnBuilder, MultipleChunkString) { TEST(InferringColumnBuilder, MultipleChunkBinary) { auto tg = TaskGroup::MakeSerial(); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré\xff"}}, &actual); @@ -350,7 +440,8 @@ TEST(InferringColumnBuilder, MultipleChunkBinary) { TEST(InferringColumnBuilder, MultipleChunkIntegerParallel) { auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool()); std::shared_ptr<ColumnBuilder> builder; - ASSERT_OK(ColumnBuilder::Make(0, ConvertOptions::Defaults(), tg, &builder)); + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, ConvertOptions::Defaults(), tg, + &builder)); std::shared_ptr<ChunkedArray> actual; AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual); diff --git a/cpp/src/arrow/csv/column-builder.cc b/cpp/src/arrow/csv/column-builder.cc index cfc36fe..eff0088 100644 --- a/cpp/src/arrow/csv/column-builder.cc +++ b/cpp/src/arrow/csv/column-builder.cc @@ -25,9 +25,11 @@ #include <vector> #include "arrow/array.h" +#include "arrow/builder.h" #include "arrow/csv/column-builder.h" #include "arrow/csv/converter.h" #include "arrow/csv/options.h" +#include "arrow/csv/parser.h" #include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/table.h" @@ -51,6 +53,73 @@ void ColumnBuilder::Append(const std::shared_ptr<BlockParser>& parser) { } ////////////////////////////////////////////////////////////////////////// +// Null column builder implementation (for a column not in the CSV file) + +class NullColumnBuilder : public ColumnBuilder { + public: + explicit NullColumnBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool, + const std::shared_ptr<internal::TaskGroup>& task_group) + : ColumnBuilder(task_group), type_(type), pool_(pool) {} + + Status Init(); + + void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override; + Status Finish(std::shared_ptr<ChunkedArray>* out) override; + + // While NullColumnBuilder is so cheap that it doesn't need parallelization + // in itself, the CSV reader doesn't know this and can still call it from + // multiple threads, so use a mutex anyway. + std::mutex mutex_; + + std::shared_ptr<DataType> type_; + MemoryPool* pool_; + std::unique_ptr<ArrayBuilder> builder_; +}; + +Status NullColumnBuilder::Init() { return MakeBuilder(pool_, type_, &builder_); } + +void NullColumnBuilder::Insert(int64_t block_index, + const std::shared_ptr<BlockParser>& parser) { + // Create a null Array pointer at the back at the list. + size_t chunk_index = static_cast<size_t>(block_index); + { + std::lock_guard<std::mutex> lock(mutex_); + if (chunks_.size() <= chunk_index) { + chunks_.resize(chunk_index + 1); + } + } + + // Spawn a task that will build an array of nulls with the right DataType + const int32_t num_rows = parser->num_rows(); + DCHECK_GE(num_rows, 0); + + task_group_->Append([=]() -> Status { + std::shared_ptr<Array> res; + RETURN_NOT_OK(builder_->AppendNulls(num_rows)); + RETURN_NOT_OK(builder_->Finish(&res)); + + std::lock_guard<std::mutex> lock(mutex_); + // Should not insert an already built chunk + DCHECK_EQ(chunks_[chunk_index], nullptr); + chunks_[chunk_index] = std::move(res); + return Status::OK(); + }); +} + +Status NullColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) { + // Unnecessary iff all tasks have finished + std::lock_guard<std::mutex> lock(mutex_); + + for (const auto& chunk : chunks_) { + if (chunk == nullptr) { + return Status::Invalid("a chunk failed allocating for an unknown reason"); + } + } + *out = std::make_shared<ChunkedArray>(chunks_, type_); + return Status::OK(); +} + +////////////////////////////////////////////////////////////////////////// // Pre-typed column builder implementation class TypedColumnBuilder : public ColumnBuilder { @@ -355,28 +424,37 @@ Status InferringColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) { ////////////////////////////////////////////////////////////////////////// // Factory functions -Status ColumnBuilder::Make(const std::shared_ptr<DataType>& type, int32_t col_index, - const ConvertOptions& options, +Status ColumnBuilder::Make(MemoryPool* pool, const std::shared_ptr<DataType>& type, + int32_t col_index, const ConvertOptions& options, const std::shared_ptr<TaskGroup>& task_group, std::shared_ptr<ColumnBuilder>* out) { - auto ptr = - new TypedColumnBuilder(type, col_index, options, default_memory_pool(), task_group); + auto ptr = new TypedColumnBuilder(type, col_index, options, pool, task_group); auto res = std::shared_ptr<ColumnBuilder>(ptr); RETURN_NOT_OK(ptr->Init()); *out = res; return Status::OK(); } -Status ColumnBuilder::Make(int32_t col_index, const ConvertOptions& options, +Status ColumnBuilder::Make(MemoryPool* pool, int32_t col_index, + const ConvertOptions& options, const std::shared_ptr<TaskGroup>& task_group, std::shared_ptr<ColumnBuilder>* out) { - auto ptr = - new InferringColumnBuilder(col_index, options, default_memory_pool(), task_group); + // XXX + auto ptr = new InferringColumnBuilder(col_index, options, pool, task_group); auto res = std::shared_ptr<ColumnBuilder>(ptr); RETURN_NOT_OK(ptr->Init()); *out = res; return Status::OK(); } +Status ColumnBuilder::MakeNull(MemoryPool* pool, const std::shared_ptr<DataType>& type, + const std::shared_ptr<internal::TaskGroup>& task_group, + std::shared_ptr<ColumnBuilder>* out) { + auto res = std::make_shared<NullColumnBuilder>(type, pool, task_group); + RETURN_NOT_OK(res->Init()); + *out = std::move(res); + return Status::OK(); +} + } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/column-builder.h b/cpp/src/arrow/csv/column-builder.h index 054a642..789e7b9 100644 --- a/cpp/src/arrow/csv/column-builder.h +++ b/cpp/src/arrow/csv/column-builder.h @@ -63,16 +63,22 @@ class ARROW_EXPORT ColumnBuilder { std::shared_ptr<internal::TaskGroup> task_group() { return task_group_; } /// Construct a strictly-typed ColumnBuilder. - static Status Make(const std::shared_ptr<DataType>& type, int32_t col_index, - const ConvertOptions& options, + static Status Make(MemoryPool* pool, const std::shared_ptr<DataType>& type, + int32_t col_index, const ConvertOptions& options, const std::shared_ptr<internal::TaskGroup>& task_group, std::shared_ptr<ColumnBuilder>* out); /// Construct a type-inferring ColumnBuilder. - static Status Make(int32_t col_index, const ConvertOptions& options, + static Status Make(MemoryPool* pool, int32_t col_index, const ConvertOptions& options, const std::shared_ptr<internal::TaskGroup>& task_group, std::shared_ptr<ColumnBuilder>* out); + /// Construct a ColumnBuilder for a column of nulls + /// (i.e. not present in the CSV file). + static Status MakeNull(MemoryPool* pool, const std::shared_ptr<DataType>& type, + const std::shared_ptr<internal::TaskGroup>& task_group, + std::shared_ptr<ColumnBuilder>* out); + protected: explicit ColumnBuilder(const std::shared_ptr<internal::TaskGroup>& task_group) : task_group_(task_group) {} diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 21d0ab2..846c2b8 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -73,6 +73,19 @@ struct ARROW_EXPORT ConvertOptions { // If false, then all strings are valid string values. bool strings_can_be_null = false; + // XXX Should we have a separate FilterOptions? + + // If non-empty, indicates the names of columns from the CSV file that should + // be actually read and converted (in the vector's order). + // Columns not in this vector will be ignored. + std::vector<std::string> include_columns; + // If false, columns in `include_columns` but not in the CSV file will error out. + // If true, columns in `include_columns` but not in the CSV file will produce + // a column of nulls (whose type is selected using `column_types`, + // or null by default) + // This option is ignored if `include_columns` is empty. + bool include_missing_columns = false; + static ConvertOptions Defaults(); }; diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc index d6454ed..8d085dc 100644 --- a/cpp/src/arrow/csv/parser.cc +++ b/cpp/src/arrow/csv/parser.cc @@ -513,7 +513,11 @@ Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_si BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols, int32_t max_num_rows) - : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {} + : pool_(pool), + options_(options), + num_rows_(-1), + num_cols_(num_cols), + max_num_rows_(max_num_rows) {} BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows) : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {} diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 1ef2d39..1bccc9b 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -163,7 +163,7 @@ class BaseTableReader : public csv::TableReader { if (read_options_.column_names.empty()) { // Read one row with column names - BlockParser parser(pool_, parse_options_, num_cols_, 1); + BlockParser parser(pool_, parse_options_, num_csv_cols_, 1); uint32_t parsed_size = 0; RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_), static_cast<uint32_t>(cur_size_), &parsed_size)); @@ -189,27 +189,88 @@ class BaseTableReader : public csv::TableReader { column_names_ = read_options_.column_names; } - num_cols_ = static_cast<int32_t>(column_names_.size()); - DCHECK_GT(num_cols_, 0); + num_csv_cols_ = static_cast<int32_t>(column_names_.size()); + DCHECK_GT(num_csv_cols_, 0); - // Construct column builders - for (int32_t col_index = 0; col_index < num_cols_; ++col_index) { + if (convert_options_.include_columns.empty()) { + return MakeColumnBuilders(); + } else { + return MakeColumnBuilders(convert_options_.include_columns); + } + } + + // Make column builders, assuming inclusion of all columns in CSV file order + Status MakeColumnBuilders() { + for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) { std::shared_ptr<ColumnBuilder> builder; - // Does the named column have a fixed type? - auto it = convert_options_.column_types.find(column_names_[col_index]); - if (it == convert_options_.column_types.end()) { - RETURN_NOT_OK( - ColumnBuilder::Make(col_index, convert_options_, task_group_, &builder)); + const auto& col_name = column_names_[col_index]; + + RETURN_NOT_OK(MakeCSVColumnBuilder(col_name, col_index, &builder)); + column_builders_.push_back(builder); + builder_names_.push_back(col_name); + } + return Status::OK(); + } + + // Make column builders, assuming inclusion of columns in `include_columns` order + Status MakeColumnBuilders(const std::vector<std::string>& include_columns) { + // Compute indices of columns in the CSV file + std::unordered_map<std::string, int32_t> col_indices; + col_indices.reserve(column_names_.size()); + for (int32_t i = 0; i < static_cast<int32_t>(column_names_.size()); ++i) { + col_indices.emplace(column_names_[i], i); + } + + // For each column name in include_columns, build the corresponding ColumnBuilder + for (const auto& col_name : include_columns) { + std::shared_ptr<ColumnBuilder> builder; + auto it = col_indices.find(col_name); + if (it != col_indices.end()) { + auto col_index = it->second; + RETURN_NOT_OK(MakeCSVColumnBuilder(col_name, col_index, &builder)); } else { - RETURN_NOT_OK(ColumnBuilder::Make(it->second, col_index, convert_options_, - task_group_, &builder)); + // Column not in the CSV file + if (convert_options_.include_missing_columns) { + RETURN_NOT_OK(MakeNullColumnBuilder(col_name, &builder)); + } else { + return Status::KeyError("Column '", col_name, + "' in include_columns " + "does not exist in CSV file"); + } } column_builders_.push_back(builder); + builder_names_.push_back(col_name); } - return Status::OK(); } + // Make a column builder for the given CSV column name and index + Status MakeCSVColumnBuilder(const std::string& col_name, int32_t col_index, + std::shared_ptr<ColumnBuilder>* out) { + // Does the named column have a fixed type? + auto it = convert_options_.column_types.find(col_name); + if (it == convert_options_.column_types.end()) { + return ColumnBuilder::Make(pool_, col_index, convert_options_, task_group_, out); + } else { + return ColumnBuilder::Make(pool_, it->second, col_index, convert_options_, + task_group_, out); + } + } + + // Make a column builder for a column of nulls + Status MakeNullColumnBuilder(const std::string& col_name, + std::shared_ptr<ColumnBuilder>* out) { + std::shared_ptr<DataType> type; + // If the named column have a fixed type, use it, otherwise use null() + auto it = convert_options_.column_types.find(col_name); + if (it != convert_options_.column_types.end()) { + type = it->second; + } else { + type = null(); + } + return ColumnBuilder::MakeNull(pool_, type, task_group_, out); + } + // Trigger conversion of parsed block data Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) { for (auto& builder : column_builders_) { @@ -219,17 +280,15 @@ class BaseTableReader : public csv::TableReader { } Status MakeTable(std::shared_ptr<Table>* out) { - DCHECK_GT(num_cols_, 0); - DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(num_cols_)); - DCHECK_EQ(column_builders_.size(), static_cast<uint32_t>(num_cols_)); + DCHECK_EQ(column_builders_.size(), builder_names_.size()); std::vector<std::shared_ptr<Field>> fields; std::vector<std::shared_ptr<ChunkedArray>> columns; - for (int32_t i = 0; i < num_cols_; ++i) { + for (int32_t i = 0; i < static_cast<int32_t>(builder_names_.size()); ++i) { std::shared_ptr<ChunkedArray> array; RETURN_NOT_OK(column_builders_[i]->Finish(&array)); - fields.push_back(::arrow::field(column_names_[i], array->type())); + fields.push_back(::arrow::field(builder_names_[i], array->type())); columns.emplace_back(std::move(array)); } *out = Table::Make(schema(fields), columns); @@ -241,12 +300,17 @@ class BaseTableReader : public csv::TableReader { ParseOptions parse_options_; ConvertOptions convert_options_; - int32_t num_cols_ = -1; - std::shared_ptr<ReadaheadSpooler> readahead_; - // Column names + // Number of columns in the CSV file + int32_t num_csv_cols_ = -1; + // Column names in the CSV file std::vector<std::string> column_names_; - std::shared_ptr<internal::TaskGroup> task_group_; + // Column builders for target Table (not necessarily in CSV file order) std::vector<std::shared_ptr<ColumnBuilder>> column_builders_; + // Names of columns, in same order as column_builders_ + std::vector<std::string> builder_names_; + + std::shared_ptr<ReadaheadSpooler> readahead_; + std::shared_ptr<internal::TaskGroup> task_group_; // Current block and data pointer std::shared_ptr<Buffer> cur_block_; @@ -289,7 +353,7 @@ class SerialTableReader : public BaseTableReader { static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max(); auto parser = - std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows); + std::make_shared<BlockParser>(pool_, parse_options_, num_csv_cols_, max_num_rows); while (!eof_) { // Consume current block uint32_t parsed_size = 0; @@ -376,8 +440,8 @@ class ThreadedTableReader : public BaseTableReader { // "mutable" allows to modify captured by-copy chunk_buffer task_group_->Append([=]() mutable -> Status { - auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, - max_num_rows); + auto parser = std::make_shared<BlockParser>(pool_, parse_options_, + num_csv_cols_, max_num_rows); uint32_t parsed_size = 0; RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data), chunk_size, &parsed_size)); @@ -409,8 +473,8 @@ class ThreadedTableReader : public BaseTableReader { for (auto& builder : column_builders_) { builder->SetTaskGroup(task_group_); } - auto parser = - std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows); + auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_csv_cols_, + max_num_rows); uint32_t parsed_size = 0; RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_), static_cast<uint32_t>(cur_size_), &parsed_size)); diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h index edf6f11..53255f9 100644 --- a/cpp/src/arrow/csv/reader.h +++ b/cpp/src/arrow/csv/reader.h @@ -41,7 +41,6 @@ class ARROW_EXPORT TableReader { virtual Status Read(std::shared_ptr<Table>* out) = 0; - // XXX pass optional schema? static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input, const ReadOptions&, const ParseOptions&, const ConvertOptions&, std::shared_ptr<TableReader>* out); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 446010f..3f68561 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -21,6 +21,7 @@ #include <cstdlib> #include <limits> #include <memory> +#include <sstream> #include <utility> #include "arrow/array.h" @@ -184,10 +185,12 @@ Status ChunkedArray::Validate() const { } const auto& type = *chunks_[0]->type(); + // Make sure chunks all have the same type for (size_t i = 1; i < chunks_.size(); ++i) { - if (!chunks_[i]->type()->Equals(type)) { + const Array& chunk = *chunks_[i]; + if (!chunk.type()->Equals(type)) { return Status::Invalid("In chunk ", i, " expected type ", type.ToString(), - " but saw ", chunks_[i]->type()->ToString()); + " but saw ", chunk.type()->ToString()); } } return Status::OK(); @@ -343,7 +346,7 @@ class SimpleTable : public Table { } } - // Make sure columns are all the same length + // Make sure columns are all the same length, and validate them for (int i = 0; i < num_columns(); ++i) { const ChunkedArray* col = columns_[i].get(); if (col->length() != num_rows_) { @@ -351,6 +354,12 @@ class SimpleTable : public Table { " expected length ", num_rows_, " but got length ", col->length()); } + Status st = col->Validate(); + if (!st.ok()) { + std::stringstream ss; + ss << "Column " << i << ": " << st.message(); + return st.WithMessage(ss.str()); + } } return Status::OK(); } diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 93e9cb3..cb5fe18 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -55,7 +55,7 @@ cdef class ReadOptions: The number of rows to skip at the start of the CSV data, not including the row of column names (if any). column_names: list, optional - The Table column names. If empty, column names will be + The column names in the CSV file. If empty, column names will be read from the first row after `skip_rows`. """ cdef: @@ -115,7 +115,7 @@ cdef class ReadOptions: @property def column_names(self): """ - The Table column names. If empty, column names will be + The column names in the CSV file. If empty, column names will be read from the first row after `skip_rows`. """ return [frombytes(s) for s in self.options.column_names] @@ -288,6 +288,17 @@ cdef class ConvertOptions: If true, then strings in null_values are considered null for string columns. If false, then all strings are valid string values. + include_columns: list, optional + The names of columns to include in the Table. + If empty, the Table will include all columns from the CSV file. + If not empty, only these columns will be included, in this order. + include_missing_columns: bool, optional (default False) + If false, columns in `include_columns` but not in the CSV file will + error out. + If true, columns in `include_columns` but not in the CSV file will + produce a column of nulls (whose type is selected using + `column_types`, or null by default). + This option is ignored if `include_columns` is empty. """ cdef: CCSVConvertOptions options @@ -297,7 +308,8 @@ cdef class ConvertOptions: def __init__(self, check_utf8=None, column_types=None, null_values=None, true_values=None, false_values=None, - strings_can_be_null=None): + strings_can_be_null=None, include_columns=None, + include_missing_columns=None): self.options = CCSVConvertOptions.Defaults() if check_utf8 is not None: self.check_utf8 = check_utf8 @@ -311,6 +323,10 @@ cdef class ConvertOptions: self.false_values = false_values if strings_can_be_null is not None: self.strings_can_be_null = strings_can_be_null + if include_columns is not None: + self.include_columns = include_columns + if include_missing_columns is not None: + self.include_missing_columns = include_missing_columns @property def check_utf8(self): @@ -396,6 +412,38 @@ cdef class ConvertOptions: def false_values(self, value): self.options.false_values = [tobytes(x) for x in value] + @property + def include_columns(self): + """ + The names of columns to include in the Table. + + If empty, the Table will include all columns from the CSV file. + If not empty, only these columns will be included, in this order. + """ + return [frombytes(s) for s in self.options.include_columns] + + @include_columns.setter + def include_columns(self, value): + self.options.include_columns.clear() + for item in value: + self.options.include_columns.push_back(tobytes(item)) + + @property + def include_missing_columns(self): + """ + If false, columns in `include_columns` but not in the CSV file will + error out. + If true, columns in `include_columns` but not in the CSV file will + produce a null column (whose type is selected using `column_types`, + or null by default). + This option is ignored if `include_columns` is empty. + """ + return self.options.include_missing_columns + + @include_missing_columns.setter + def include_missing_columns(self, value): + self.options.include_missing_columns = value + cdef _get_reader(input_file, shared_ptr[InputStream]* out): use_memory_map = False diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi index 3cb9142..6c64f9b 100644 --- a/python/pyarrow/error.pxi +++ b/python/pyarrow/error.pxi @@ -37,7 +37,9 @@ class ArrowIOError(IOError, ArrowException): class ArrowKeyError(KeyError, ArrowException): - pass + def __str__(self): + # Override KeyError.__str__, as it uses the repr() of the key + return ArrowException.__str__(self) class ArrowTypeError(TypeError, ArrowException): diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ad0fa09..9c2aa6a 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1113,6 +1113,8 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: vector[c_string] true_values vector[c_string] false_values c_bool strings_can_be_null + vector[c_string] include_columns + c_bool include_missing_columns @staticmethod CCSVConvertOptions Defaults() diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index f103d26..fad7ab4 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -78,6 +78,13 @@ cdef class ChunkedArray(_PandasConvertible): def __str__(self): return self.format() + def validate(self): + """ + Validate chunked array consistency. + """ + with nogil: + check_status(self.sp_chunked_array.get().Validate()) + @property def null_count(self): """ diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index a42237e..543f659 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -176,15 +176,27 @@ def test_convert_options(): opts.false_values = ['xxx', 'yyy'] assert opts.false_values == ['xxx', 'yyy'] + assert opts.include_columns == [] + opts.include_columns = ['def', 'abc'] + assert opts.include_columns == ['def', 'abc'] + + assert opts.include_missing_columns is False + opts.include_missing_columns = True + assert opts.include_missing_columns is True + opts = cls(check_utf8=False, column_types={'a': pa.null()}, null_values=['N', 'nn'], true_values=['T', 'tt'], - false_values=['F', 'ff'], strings_can_be_null=True) + false_values=['F', 'ff'], strings_can_be_null=True, + include_columns=['abc', 'def'], + include_missing_columns=True) assert opts.check_utf8 is False assert opts.column_types == {'a': pa.null()} assert opts.null_values == ['N', 'nn'] assert opts.false_values == ['F', 'ff'] assert opts.true_values == ['T', 'tt'] assert opts.strings_can_be_null is True + assert opts.include_columns == ['abc', 'def'] + assert opts.include_missing_columns is True class BaseTestCSVRead: @@ -306,6 +318,80 @@ class BaseTestCSVRead: "y": ["kl", "op"], } + def test_include_columns(self): + rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" + + convert_options = ConvertOptions() + convert_options.include_columns = ['ab'] + table = self.read_bytes(rows, convert_options=convert_options) + self.check_names(table, ["ab"]) + assert table.to_pydict() == { + "ab": ["ef", "ij", "mn"], + } + + # Order of include_columns is respected, regardless of CSV order + convert_options.include_columns = ['cd', 'ab'] + table = self.read_bytes(rows, convert_options=convert_options) + schema = pa.schema([('cd', pa.string()), + ('ab', pa.string())]) + assert table.schema == schema + assert table.to_pydict() == { + "cd": ["gh", "kl", "op"], + "ab": ["ef", "ij", "mn"], + } + + # Include a column not in the CSV file => raises by default + convert_options.include_columns = ['xx', 'ab', 'yy'] + with pytest.raises(KeyError, + match="Column 'xx' in include_columns " + "does not exist in CSV file"): + self.read_bytes(rows, convert_options=convert_options) + + def test_include_missing_columns(self): + rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" + + read_options = ReadOptions() + convert_options = ConvertOptions() + convert_options.include_columns = ['xx', 'ab', 'yy'] + convert_options.include_missing_columns = True + table = self.read_bytes(rows, read_options=read_options, + convert_options=convert_options) + schema = pa.schema([('xx', pa.null()), + ('ab', pa.string()), + ('yy', pa.null())]) + assert table.schema == schema + assert table.to_pydict() == { + "xx": [None, None, None], + "ab": ["ef", "ij", "mn"], + "yy": [None, None, None], + } + + # Combining with `column_names` + read_options.column_names = ["xx", "yy"] + convert_options.include_columns = ["yy", "cd"] + table = self.read_bytes(rows, read_options=read_options, + convert_options=convert_options) + schema = pa.schema([('yy', pa.string()), + ('cd', pa.null())]) + assert table.schema == schema + assert table.to_pydict() == { + "yy": ["cd", "gh", "kl", "op"], + "cd": [None, None, None, None], + } + + # And with `column_types` as well + convert_options.column_types = {"yy": pa.binary(), + "cd": pa.int32()} + table = self.read_bytes(rows, read_options=read_options, + convert_options=convert_options) + schema = pa.schema([('yy', pa.binary()), + ('cd', pa.int32())]) + assert table.schema == schema + assert table.to_pydict() == { + "yy": [b"cd", b"gh", b"kl", b"op"], + "cd": [None, None, None, None], + } + def test_simple_ints(self): # Infer integer columns rows = b"a,b,c\n1,2,3\n4,5,6\n" @@ -471,6 +557,22 @@ class BaseTestCSVRead: assert "In CSV column #1: " in err assert "CSV conversion error to float: invalid value 'XXX'" in err + def test_column_types_with_column_names(self): + # When both `column_names` and `column_types` are given, names + # in `column_types` should refer to names in `column_names` + rows = b"a,b\nc,d\ne,f\n" + read_options = ReadOptions(column_names=['x', 'y']) + convert_options = ConvertOptions(column_types={'x': pa.binary()}) + table = self.read_bytes(rows, read_options=read_options, + convert_options=convert_options) + schema = pa.schema([('x', pa.binary()), + ('y', pa.string())]) + assert table.schema == schema + assert table.to_pydict() == { + 'x': [b'a', b'c', b'e'], + 'y': ['b', 'd', 'f'], + } + def test_no_ending_newline(self): # No \n after last line rows = b"a,b,c\n1,2,3\n4,5,6" diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 8014c19..64a9a08 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -30,6 +30,7 @@ def test_chunked_array_basics(): data = pa.chunked_array([], type=pa.string()) assert data.type == pa.string() assert data.to_pylist() == [] + data.validate() with pytest.raises(ValueError): pa.chunked_array([]) @@ -43,6 +44,7 @@ def test_chunked_array_basics(): assert all(isinstance(c, pa.lib.Int64Array) for c in data.chunks) assert all(isinstance(c, pa.lib.Int64Array) for c in data.iterchunks()) assert len(data.chunks) == 3 + data.validate() def test_chunked_array_mismatch_types(): @@ -177,7 +179,9 @@ def test_chunked_array_pickle(data, typ): arrays.append(pa.array(data[:2], type=typ)) data = data[2:] array = pa.chunked_array(arrays, type=typ) + array.validate() result = pickle.loads(pickle.dumps(array)) + result.validate() assert result.equals(array)