This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 3b61349 ARROW-2968: [R] Multi-threaded conversion from Arrow table to R data.frame 3b61349 is described below commit 3b61349b3c16d43003e493c7e2aec9348e7e7343 Author: Romain Francois <rom...@purrple.cat> AuthorDate: Wed Jan 9 22:00:12 2019 -0600 ARROW-2968: [R] Multi-threaded conversion from Arrow table to R data.frame The `as_tibble()` methods for `arrow::RecordBatch` and `arrow::Table` gained a `use_threads` argument. When set to `TRUE` columns of a record batch or table are converted to R vectors in parallel. We cannot allocate R data structures in parallel (including scalar strings), so it goes like this: ``` for each column: - allocate the R vector host for the array - if that can be done in parallel, fill the R vector with data from the array fill serially all columns that could not be filled in parallel wait for all columns to be full ``` This is I believe better (although perhaps harder to explain) than - allocate all the vectors - fill them in parallel Because we don't have to wait for all the vectors to be allocated to start filling them. I believe the python does that, in `DataFrameBlockCreator::Convert` ``` RETURN_NOT_OK(CreateBlocks()); RETURN_NOT_OK(WriteTableToBlocks()); ``` I've had to split the implementation of `Array__as_vector` into two steps: - Allocate: this must happen on the main thread, or alternatively would need to mutex R - Ingest: For most array types, this can be done in parallel Author: Romain Francois <rom...@purrple.cat> Closes #3332 from romainfrancois/2968/threads and squashes the following commits: 8261f2907 <Romain Francois> sprinkle use_threads in functions that call as_tibble() 3205de2d8 <Romain Francois> lint 590baf5a6 <Romain Francois> using string_view cd0dd343e <Romain Francois> no need for checkBuffers 29546cd5d <Romain Francois> Some more refactoring of the Converters 5557b7974 <Romain Francois> refactor the Converter api, so that all Converters are implementations of the base class Converter. e2ed26b78 <Romain Francois> lint 2a5815e03 <Romain Francois> moving parallel_ingest() to a static method of the Converter classes 2613d4ec4 <Romain Francois> null_count already local variable 62a842054 <Romain Francois> + to_r_index lambda, with comment about why +1 52c725fc8 <Romain Francois> default_value() marked constexpr 11e82e769 <Romain Francois> lint d22b9c551 <Romain Francois> parallel version of Table__to_dataframe 2455bd057 <Romain Francois> parallel version of RecordBatch__to_dataframe 380d3a5bc <Romain Francois> simplify ArrayVector__as_vector. 85881a3e2 <Romain Francois> simplify ArrayVector_To_Vector 7074b36e9 <Romain Francois> reinstate Converter_Timestamp so that ArrayVector__as_vector can be simplified cf7e76bae <Romain Francois> + parallel_ingest<Converter>() to indicate if ingest for a givne converter can be doine in parallel baaaefe1b <Romain Francois> Re"work Converter api e650b7934 <Romain Francois> + arrow::r::inspect(SEXP) for debugging a335dfdfc <Romain Francois> Factor out Array -> R vector code in separate file 1212e28a9 <Romain Francois> <Converter>.Ingest() return an Invalid status instead of throwing an exception 39bf76403 <Romain Francois> <Converter>.Ingest() return a Status instead of void f68b79376 <Romain Francois> replaced DictionaryArrays_to_Vector and Converter_Dictionary_Int32Indices by Converter_Dictionary d25a0e6b5 <Romain Francois> replace Date32ArrayVector_to_Vector by Converter_Date32 85e48c0c7 <Romain Francois> lint 18b921e6f <Romain Francois> + Get/Set ThreadPoolCapacity --- r/NAMESPACE | 2 + r/R/RcppExports.R | 57 +- r/R/RecordBatch.R | 4 +- r/R/Table.R | 4 +- r/R/feather.R | 5 +- r/R/parquet.R | 5 +- r/R/read_table.R | 4 +- r/man/GetCpuThreadPoolCapacity.Rd | 18 + r/man/SetCpuThreadPoolCapacity.Rd | 17 + r/man/read_feather.Rd | 5 +- r/man/read_parquet.Rd | 4 +- r/man/read_table.Rd | 4 +- r/src/RcppExports.cpp | 120 ++-- r/src/array.cpp | 496 --------------- r/src/array__to_vector.cpp | 697 +++++++++++++++++++++ r/src/arrow_types.h | 12 +- r/src/recordbatch.cpp | 16 - r/src/symbols.cpp | 9 + r/src/table.cpp | 17 - r/src/threadpool.cpp | 44 ++ r/tests/testthat/test-RecordBatch.R | 1 - .../testthat/test-cputhreadpoolcapacity.R} | 25 +- 22 files changed, 942 insertions(+), 624 deletions(-) diff --git a/r/NAMESPACE b/r/NAMESPACE index f8f6384..7fd76c7 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -80,6 +80,7 @@ export(FeatherTableWriter) export(FileMode) export(FileOutputStream) export(FixedSizeBufferWriter) +export(GetCpuThreadPoolCapacity) export(MessageReader) export(MessageType) export(MockOutputStream) @@ -88,6 +89,7 @@ export(RecordBatchFileReader) export(RecordBatchFileWriter) export(RecordBatchStreamReader) export(RecordBatchStreamWriter) +export(SetCpuThreadPoolCapacity) export(StatusCode) export(TimeUnit) export(Type) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index c6fe871..51ed4ea 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -5,14 +5,6 @@ Array__from_vector <- function(x) { .Call(`_arrow_Array__from_vector`, x) } -Array__as_vector <- function(array) { - .Call(`_arrow_Array__as_vector`, array) -} - -ChunkedArray__as_vector <- function(chunked_array) { - .Call(`_arrow_ChunkedArray__as_vector`, chunked_array) -} - Array__Slice1 <- function(array, offset) { .Call(`_arrow_Array__Slice1`, array, offset) } @@ -81,6 +73,22 @@ DictionaryArray__dictionary <- function(array) { .Call(`_arrow_DictionaryArray__dictionary`, array) } +Array__as_vector <- function(array) { + .Call(`_arrow_Array__as_vector`, array) +} + +ChunkedArray__as_vector <- function(chunked_array) { + .Call(`_arrow_ChunkedArray__as_vector`, chunked_array) +} + +RecordBatch__to_dataframe <- function(batch, use_threads) { + .Call(`_arrow_RecordBatch__to_dataframe`, batch, use_threads) +} + +Table__to_dataframe <- function(table, use_threads) { + .Call(`_arrow_Table__to_dataframe`, table, use_threads) +} + ArrayData__get_type <- function(x) { .Call(`_arrow_ArrayData__get_type`, x) } @@ -661,10 +669,6 @@ RecordBatch__column <- function(batch, i) { .Call(`_arrow_RecordBatch__column`, batch, i) } -RecordBatch__to_dataframe <- function(batch) { - .Call(`_arrow_RecordBatch__to_dataframe`, batch) -} - RecordBatch__from_dataframe <- function(tbl) { .Call(`_arrow_RecordBatch__from_dataframe`, tbl) } @@ -781,10 +785,6 @@ Table__schema <- function(x) { .Call(`_arrow_Table__schema`, x) } -Table__to_dataframe <- function(table) { - .Call(`_arrow_Table__to_dataframe`, table) -} - Table__column <- function(table, i) { .Call(`_arrow_Table__column`, table, i) } @@ -793,3 +793,28 @@ Table__columns <- function(table) { .Call(`_arrow_Table__columns`, table) } +#' Get the capacity of the global thread pool +#' +#' @return the number of worker threads in the thread pool to which +#' Arrow dispatches various CPU-bound tasks. This is an ideal number, +#' not necessarily the exact number of threads at a given point in time. +#' +#' You can change this number using [SetCpuThreadPoolCapacity()]. +#' +#' @export +GetCpuThreadPoolCapacity <- function() { + .Call(`_arrow_GetCpuThreadPoolCapacity`) +} + +#' Set the capacity of the global thread pool +#' +#' @param threads the number of worker threads int the thread pool to which +#' Arrow dispatches various CPU-bound tasks. +#' +#' The current number is returned by [GetCpuThreadPoolCapacity()] +#' +#' @export +SetCpuThreadPoolCapacity <- function(threads) { + invisible(.Call(`_arrow_SetCpuThreadPoolCapacity`, threads)) +} + diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R index fed10ab..9872117 100644 --- a/r/R/RecordBatch.R +++ b/r/R/RecordBatch.R @@ -80,8 +80,8 @@ } #' @export -`as_tibble.arrow::RecordBatch` <- function(x, ...){ - RecordBatch__to_dataframe(x) +`as_tibble.arrow::RecordBatch` <- function(x, use_threads = TRUE, ...){ + RecordBatch__to_dataframe(x, use_threads = use_threads) } #' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame diff --git a/r/R/Table.R b/r/R/Table.R index 8972634..c39fce2 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -61,6 +61,6 @@ table <- function(.data){ } #' @export -`as_tibble.arrow::Table` <- function(x, ...){ - Table__to_dataframe(x) +`as_tibble.arrow::Table` <- function(x, use_threads = TRUE, ...){ + Table__to_dataframe(x, use_threads = use_threads) } diff --git a/r/R/feather.R b/r/R/feather.R index 0646521..eaeea4c 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -154,15 +154,16 @@ FeatherTableReader.fs_path <- function(file, mmap = TRUE, ...) { #' @param file a arrow::ipc::feather::TableReader or whatever the [FeatherTableReader()] function can handle #' @param columns names if the columns to read. The default `NULL` means all columns #' @param as_tibble should the [arrow::Table][arrow__Table] be converted to a tibble. +#' @param use_threads Use threads when converting to a tibble. #' @param ... additional parameters #' #' @return a data frame if `as_tibble` is `TRUE` (the default), or a [arrow::Table][arrow__Table] otherwise #' #' @export -read_feather <- function(file, columns = NULL, as_tibble = TRUE, ...){ +read_feather <- function(file, columns = NULL, as_tibble = TRUE, use_threads = TRUE, ...){ out <- FeatherTableReader(file, ...)$Read(columns) if (isTRUE(as_tibble)) { - out <- as_tibble(out) + out <- as_tibble(out, use_threads = use_threads) } out } diff --git a/r/R/parquet.R b/r/R/parquet.R index 141da7b..6a393e2 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -19,15 +19,16 @@ #' #' @param file a file path #' @param as_tibble should the [arrow::Table][arrow__Table] be converted to a tibble. +#' @param use_threads Use threads when converting to a tibble, only relevant if `as_tibble` is `TRUE` #' @param ... currently ignored #' #' @return a [arrow::Table][arrow__Table], or a data frame if `as_tibble` is `TRUE`. #' #' @export -read_parquet <- function(file, as_tibble = TRUE, ...) { +read_parquet <- function(file, as_tibble = TRUE, use_threads = TRUE, ...) { tab <- shared_ptr(`arrow::Table`, read_parquet_file(f)) if (isTRUE(as_tibble)) { - tab <- as_tibble(tab) + tab <- as_tibble(tab, use_threads = use_threads) } tab } diff --git a/r/R/read_table.R b/r/R/read_table.R index a540a42..260c50f 100644 --- a/r/R/read_table.R +++ b/r/R/read_table.R @@ -33,6 +33,8 @@ #' #' - a raw vector: read using a [arrow::ipc::RecordBatchStreamReader][arrow__ipc__RecordBatchStreamReader] #' +#' @param use_threads Use threads when converting to a tibble +#' #' @return #' #' - `read_table` returns an [arrow::Table][arrow__Table] @@ -81,6 +83,6 @@ read_table.fs_path <- function(stream) { #' @rdname read_table #' @export -read_arrow <- function(stream){ +read_arrow <- function(stream, use_threads = TRUE){ as_tibble(read_table(stream)) } diff --git a/r/man/GetCpuThreadPoolCapacity.Rd b/r/man/GetCpuThreadPoolCapacity.Rd new file mode 100644 index 0000000..8bf0a6f --- /dev/null +++ b/r/man/GetCpuThreadPoolCapacity.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/RcppExports.R +\name{GetCpuThreadPoolCapacity} +\alias{GetCpuThreadPoolCapacity} +\title{Get the capacity of the global thread pool} +\usage{ +GetCpuThreadPoolCapacity() +} +\value{ +the number of worker threads in the thread pool to which +Arrow dispatches various CPU-bound tasks. This is an ideal number, +not necessarily the exact number of threads at a given point in time. + +You can change this number using \code{\link[=SetCpuThreadPoolCapacity]{SetCpuThreadPoolCapacity()}}. +} +\description{ +Get the capacity of the global thread pool +} diff --git a/r/man/SetCpuThreadPoolCapacity.Rd b/r/man/SetCpuThreadPoolCapacity.Rd new file mode 100644 index 0000000..3a06dd5 --- /dev/null +++ b/r/man/SetCpuThreadPoolCapacity.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/RcppExports.R +\name{SetCpuThreadPoolCapacity} +\alias{SetCpuThreadPoolCapacity} +\title{Set the capacity of the global thread pool} +\usage{ +SetCpuThreadPoolCapacity(threads) +} +\arguments{ +\item{threads}{the number of worker threads int the thread pool to which +Arrow dispatches various CPU-bound tasks. + +The current number is returned by \code{\link[=GetCpuThreadPoolCapacity]{GetCpuThreadPoolCapacity()}}} +} +\description{ +Set the capacity of the global thread pool +} diff --git a/r/man/read_feather.Rd b/r/man/read_feather.Rd index 31fd36a..4509c7d 100644 --- a/r/man/read_feather.Rd +++ b/r/man/read_feather.Rd @@ -4,7 +4,8 @@ \alias{read_feather} \title{Read a feather file} \usage{ -read_feather(file, columns = NULL, as_tibble = TRUE, ...) +read_feather(file, columns = NULL, as_tibble = TRUE, + use_threads = TRUE, ...) } \arguments{ \item{file}{a arrow::ipc::feather::TableReader or whatever the \code{\link[=FeatherTableReader]{FeatherTableReader()}} function can handle} @@ -13,6 +14,8 @@ read_feather(file, columns = NULL, as_tibble = TRUE, ...) \item{as_tibble}{should the \link[=arrow__Table]{arrow::Table} be converted to a tibble.} +\item{use_threads}{Use threads when converting to a tibble.} + \item{...}{additional parameters} } \value{ diff --git a/r/man/read_parquet.Rd b/r/man/read_parquet.Rd index c29e18b..a4f294b 100644 --- a/r/man/read_parquet.Rd +++ b/r/man/read_parquet.Rd @@ -4,13 +4,15 @@ \alias{read_parquet} \title{Read parquet file from disk} \usage{ -read_parquet(file, as_tibble = TRUE, ...) +read_parquet(file, as_tibble = TRUE, use_threads = TRUE, ...) } \arguments{ \item{file}{a file path} \item{as_tibble}{should the \link[=arrow__Table]{arrow::Table} be converted to a tibble.} +\item{use_threads}{Use threads when converting to a tibble, only relevant if \code{as_tibble} is \code{TRUE}} + \item{...}{currently ignored} } \value{ diff --git a/r/man/read_table.Rd b/r/man/read_table.Rd index 3231b26..356ec5e 100644 --- a/r/man/read_table.Rd +++ b/r/man/read_table.Rd @@ -7,7 +7,7 @@ \usage{ read_table(stream) -read_arrow(stream) +read_arrow(stream, use_threads = TRUE) } \arguments{ \item{stream}{stream. @@ -23,6 +23,8 @@ binary file format, and uses a \link[=arrow__ipc__RecordBatchFileReader]{arrow:: to process it. \item a raw vector: read using a \link[=arrow__ipc__RecordBatchStreamReader]{arrow::ipc::RecordBatchStreamReader} }} + +\item{use_threads}{Use threads when converting to a tibble} } \value{ \itemize{ diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 1e8fed1..a31c401 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -17,28 +17,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// Array__as_vector -SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array); -RcppExport SEXP _arrow_Array__as_vector(SEXP arraySEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Array>& >::type array(arraySEXP); - rcpp_result_gen = Rcpp::wrap(Array__as_vector(array)); - return rcpp_result_gen; -END_RCPP -} -// ChunkedArray__as_vector -SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array); -RcppExport SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_arraySEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< const std::shared_ptr<arrow::ChunkedArray>& >::type chunked_array(chunked_arraySEXP); - rcpp_result_gen = Rcpp::wrap(ChunkedArray__as_vector(chunked_array)); - return rcpp_result_gen; -END_RCPP -} // Array__Slice1 std::shared_ptr<arrow::Array> Array__Slice1(const std::shared_ptr<arrow::Array>& array, int offset); RcppExport SEXP _arrow_Array__Slice1(SEXP arraySEXP, SEXP offsetSEXP) { @@ -237,6 +215,52 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// Array__as_vector +SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array); +RcppExport SEXP _arrow_Array__as_vector(SEXP arraySEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Array>& >::type array(arraySEXP); + rcpp_result_gen = Rcpp::wrap(Array__as_vector(array)); + return rcpp_result_gen; +END_RCPP +} +// ChunkedArray__as_vector +SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array); +RcppExport SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_arraySEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr<arrow::ChunkedArray>& >::type chunked_array(chunked_arraySEXP); + rcpp_result_gen = Rcpp::wrap(ChunkedArray__as_vector(chunked_array)); + return rcpp_result_gen; +END_RCPP +} +// RecordBatch__to_dataframe +List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch, bool use_threads); +RcppExport SEXP _arrow_RecordBatch__to_dataframe(SEXP batchSEXP, SEXP use_threadsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& >::type batch(batchSEXP); + Rcpp::traits::input_parameter< bool >::type use_threads(use_threadsSEXP); + rcpp_result_gen = Rcpp::wrap(RecordBatch__to_dataframe(batch, use_threads)); + return rcpp_result_gen; +END_RCPP +} +// Table__to_dataframe +List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, bool use_threads); +RcppExport SEXP _arrow_Table__to_dataframe(SEXP tableSEXP, SEXP use_threadsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& >::type table(tableSEXP); + Rcpp::traits::input_parameter< bool >::type use_threads(use_threadsSEXP); + rcpp_result_gen = Rcpp::wrap(Table__to_dataframe(table, use_threads)); + return rcpp_result_gen; +END_RCPP +} // ArrayData__get_type std::shared_ptr<arrow::DataType> ArrayData__get_type(const std::shared_ptr<arrow::ArrayData>& x); RcppExport SEXP _arrow_ArrayData__get_type(SEXP xSEXP) { @@ -1846,17 +1870,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// RecordBatch__to_dataframe -List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch); -RcppExport SEXP _arrow_RecordBatch__to_dataframe(SEXP batchSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& >::type batch(batchSEXP); - rcpp_result_gen = Rcpp::wrap(RecordBatch__to_dataframe(batch)); - return rcpp_result_gen; -END_RCPP -} // RecordBatch__from_dataframe std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl); RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) { @@ -2185,17 +2198,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// Table__to_dataframe -List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table); -RcppExport SEXP _arrow_Table__to_dataframe(SEXP tableSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& >::type table(tableSEXP); - rcpp_result_gen = Rcpp::wrap(Table__to_dataframe(table)); - return rcpp_result_gen; -END_RCPP -} // Table__column std::shared_ptr<arrow::Column> Table__column(const std::shared_ptr<arrow::Table>& table, int i); RcppExport SEXP _arrow_Table__column(SEXP tableSEXP, SEXP iSEXP) { @@ -2219,11 +2221,29 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// GetCpuThreadPoolCapacity +int GetCpuThreadPoolCapacity(); +RcppExport SEXP _arrow_GetCpuThreadPoolCapacity() { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + rcpp_result_gen = Rcpp::wrap(GetCpuThreadPoolCapacity()); + return rcpp_result_gen; +END_RCPP +} +// SetCpuThreadPoolCapacity +void SetCpuThreadPoolCapacity(int threads); +RcppExport SEXP _arrow_SetCpuThreadPoolCapacity(SEXP threadsSEXP) { +BEGIN_RCPP + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< int >::type threads(threadsSEXP); + SetCpuThreadPoolCapacity(threads); + return R_NilValue; +END_RCPP +} static const R_CallMethodDef CallEntries[] = { {"_arrow_Array__from_vector", (DL_FUNC) &_arrow_Array__from_vector, 1}, - {"_arrow_Array__as_vector", (DL_FUNC) &_arrow_Array__as_vector, 1}, - {"_arrow_ChunkedArray__as_vector", (DL_FUNC) &_arrow_ChunkedArray__as_vector, 1}, {"_arrow_Array__Slice1", (DL_FUNC) &_arrow_Array__Slice1, 2}, {"_arrow_Array__Slice2", (DL_FUNC) &_arrow_Array__Slice2, 3}, {"_arrow_Array__IsNull", (DL_FUNC) &_arrow_Array__IsNull, 2}, @@ -2241,6 +2261,10 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_Array__Mask", (DL_FUNC) &_arrow_Array__Mask, 1}, {"_arrow_DictionaryArray__indices", (DL_FUNC) &_arrow_DictionaryArray__indices, 1}, {"_arrow_DictionaryArray__dictionary", (DL_FUNC) &_arrow_DictionaryArray__dictionary, 1}, + {"_arrow_Array__as_vector", (DL_FUNC) &_arrow_Array__as_vector, 1}, + {"_arrow_ChunkedArray__as_vector", (DL_FUNC) &_arrow_ChunkedArray__as_vector, 1}, + {"_arrow_RecordBatch__to_dataframe", (DL_FUNC) &_arrow_RecordBatch__to_dataframe, 2}, + {"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 2}, {"_arrow_ArrayData__get_type", (DL_FUNC) &_arrow_ArrayData__get_type, 1}, {"_arrow_ArrayData__get_length", (DL_FUNC) &_arrow_ArrayData__get_length, 1}, {"_arrow_ArrayData__get_null_count", (DL_FUNC) &_arrow_ArrayData__get_null_count, 1}, @@ -2386,7 +2410,6 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1}, {"_arrow_RecordBatch__columns", (DL_FUNC) &_arrow_RecordBatch__columns, 1}, {"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2}, - {"_arrow_RecordBatch__to_dataframe", (DL_FUNC) &_arrow_RecordBatch__to_dataframe, 1}, {"_arrow_RecordBatch__from_dataframe", (DL_FUNC) &_arrow_RecordBatch__from_dataframe, 1}, {"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2}, {"_arrow_RecordBatch__RemoveColumn", (DL_FUNC) &_arrow_RecordBatch__RemoveColumn, 2}, @@ -2416,9 +2439,10 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1}, {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1}, {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, - {"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 1}, {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2}, {"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1}, + {"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, + {"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, {NULL, NULL, 0} }; diff --git a/r/src/array.cpp b/r/src/array.cpp index 901f2b6..dd0d7e6 100644 --- a/r/src/array.cpp +++ b/r/src/array.cpp @@ -33,9 +33,6 @@ inline bool isna<REALSXP>(double x) { return ISNA(x); } -// the integer64 sentinel -constexpr int64_t NA_INT64 = std::numeric_limits<int64_t>::min(); - template <int RTYPE, typename Type> std::shared_ptr<Array> SimpleArray(SEXP x) { Rcpp::Vector<RTYPE> vec(x); @@ -503,499 +500,6 @@ std::shared_ptr<arrow::Array> Array__from_vector(SEXP x) { return nullptr; } -// ---------------------------- Array -> R vector - -namespace arrow { -namespace r { - -template <typename Converter, typename... Args> -SEXP ArrayVector_To_Vector(int64_t n, const ArrayVector& arrays, Args... args) { - Converter converter(n, std::forward<Args>(args)...); - - R_xlen_t k = 0; - for (const auto& array : arrays) { - auto n_chunk = array->length(); - converter.Ingest(array, k, n_chunk); - k += n_chunk; - } - return converter.data; -} - -template <int RTYPE> -struct Converter_SimpleArray { - using Vector = Rcpp::Vector<RTYPE>; - - Converter_SimpleArray(R_xlen_t n) : data(no_init(n)) {} - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - using value_type = typename Vector::stored_type; - auto null_count = array->null_count(); - - if (n == null_count) { - std::fill_n(data.begin() + start, n, default_value<RTYPE>()); - } else { - auto p_values = array->data()->GetValues<value_type>(1); - STOP_IF_NULL(p_values); - - // first copy all the data - std::copy_n(p_values, n, data.begin() + start); - - if (null_count) { - // then set the sentinel NA - arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), - array->offset(), n); - - for (size_t i = 0; i < n; i++, bitmap_reader.Next()) { - if (bitmap_reader.IsNotSet()) { - data[i + start] = default_value<RTYPE>(); - } - } - } - } - } - - Vector data; -}; - -struct Converter_String { - Converter_String(R_xlen_t n) : data(n) {} - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - - if (null_count == n) { - std::fill_n(data.begin(), n, NA_STRING); - } else { - auto p_offset = array->data()->GetValues<int32_t>(1); - STOP_IF_NULL(p_offset); - auto p_data = array->data()->GetValues<char>(2, *p_offset); - if (!p_data) { - // There is an offset buffer, but the data buffer is null - // There is at least one value in the array and not all the values are null - // That means all values are empty strings so there is nothing to do - return; - } - - if (null_count) { - // need to watch for nulls - arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), - array->offset(), n); - for (int i = 0; i < n; i++, null_reader.Next()) { - if (null_reader.IsSet()) { - auto diff = p_offset[i + 1] - p_offset[i]; - SET_STRING_ELT(data, start + i, Rf_mkCharLenCE(p_data, diff, CE_UTF8)); - p_data += diff; - } else { - SET_STRING_ELT(data, start + i, NA_STRING); - } - } - - } else { - // no need to check for nulls - // TODO: altrep mark this as no na - for (int i = 0; i < n; i++) { - auto diff = p_offset[i + 1] - p_offset[i]; - SET_STRING_ELT(data, start + i, Rf_mkCharLenCE(p_data, diff, CE_UTF8)); - p_data += diff; - } - } - } - } - - CharacterVector data; -}; - -struct Converter_Boolean { - Converter_Boolean(R_xlen_t n) : data(n) {} - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - - if (n == null_count) { - std::fill_n(data.begin() + start, n, NA_LOGICAL); - } else { - // process the data - auto p_data = array->data()->GetValues<uint8_t>(1, 0); - STOP_IF_NULL(p_data); - - arrow::internal::BitmapReader data_reader(p_data, array->offset(), n); - for (size_t i = 0; i < n; i++, data_reader.Next()) { - data[start + i] = data_reader.IsSet(); - } - - // then the null bitmap if needed - if (null_count) { - arrow::internal::BitmapReader null_reader(array->null_bitmap()->data(), - array->offset(), n); - for (size_t i = 0; i < n; i++, null_reader.Next()) { - if (null_reader.IsNotSet()) { - data[start + i] = NA_LOGICAL; - } - } - } - } - } - - LogicalVector data; -}; - -template <typename Type> -struct Converter_Dictionary_Int32Indices { - Converter_Dictionary_Int32Indices(R_xlen_t n, const std::shared_ptr<arrow::Array>& dict, - bool ordered) - : data(no_init(n)) { - data.attr("levels") = ArrayVector_To_Vector<Converter_String>(dict->length(), {dict}); - if (ordered) { - data.attr("class") = CharacterVector::create("ordered", "factor"); - } else { - data.attr("class") = "factor"; - } - } - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - DictionaryArray* dict_array = static_cast<DictionaryArray*>(array.get()); - using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; - auto null_count = array->null_count(); - - if (n == null_count) { - std::fill_n(data.begin() + start, n, NA_INTEGER); - } else { - std::shared_ptr<Array> indices = dict_array->indices(); - auto p_array = indices->data()->GetValues<value_type>(1); - STOP_IF_NULL(p_array); - - if (array->null_count()) { - arrow::internal::BitmapReader bitmap_reader(indices->null_bitmap()->data(), - indices->offset(), n); - for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_array) { - data[start + i] = - bitmap_reader.IsNotSet() ? NA_INTEGER : (static_cast<int>(*p_array) + 1); - } - } else { - std::transform( - p_array, p_array + n, data.begin() + start, - [](const value_type value) { return static_cast<int>(value) + 1; }); - } - } - } - - IntegerVector data; -}; - -struct Converter_Date64 { - Converter_Date64(R_xlen_t n) : data(n) { - data.attr("class") = CharacterVector::create("POSIXct", "POSIXt"); - } - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - if (null_count == n) { - std::fill_n(data.begin() + start, n, NA_REAL); - } else { - auto p_values = array->data()->GetValues<int64_t>(1); - STOP_IF_NULL(p_values); - auto p_vec = data.begin() + start; - - // convert DATE64 milliseconds to R seconds (stored as double) - auto seconds = [](int64_t ms) { return static_cast<double>(ms / 1000); }; - - if (null_count) { - arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), - array->offset(), n); - for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_vec, ++p_values) { - *p_vec = bitmap_reader.IsSet() ? seconds(*p_values) : NA_REAL; - } - } else { - std::transform(p_values, p_values + n, p_vec, seconds); - } - } - } - - NumericVector data; -}; - -template <int RTYPE, typename Type> -struct Converter_Promotion { - using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type; - using value_type = typename TypeTraits<Type>::ArrayType::value_type; - - Converter_Promotion(R_xlen_t n) : data(no_init(n)) {} - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - if (null_count == n) { - std::fill_n(data.begin() + start, n, default_value<RTYPE>()); - } else { - auto p_values = array->data()->GetValues<value_type>(1); - STOP_IF_NULL(p_values); - - auto value_convert = [](value_type value) { - return static_cast<r_stored_type>(value); - }; - if (null_count) { - internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), - array->offset(), n); - for (size_t i = 0; i < n; i++, bitmap_reader.Next()) { - data[start + i] = bitmap_reader.IsNotSet() ? Rcpp::Vector<RTYPE>::get_na() - : value_convert(p_values[i]); - } - } else { - std::transform(p_values, p_values + n, data.begin(), value_convert); - } - } - } - - Rcpp::Vector<RTYPE> data; -}; - -template <typename value_type> -struct Converter_Time { - Converter_Time(int64_t n, int32_t multiplier, CharacterVector classes) - : data(no_init(n)), multiplier_(multiplier) { - data.attr("class") = classes; - } - - Converter_Time(int64_t n, int32_t multiplier) - : data(no_init(n)), multiplier_(multiplier) { - data.attr("class") = CharacterVector::create("hms", "difftime"); - data.attr("units") = "secs"; - } - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - if (n == null_count) { - std::fill_n(data.begin() + start, n, NA_REAL); - } else { - auto p_values = array->data()->GetValues<value_type>(1); - STOP_IF_NULL(p_values); - auto p_vec = data.begin() + start; - auto convert = [this](value_type value) { - return static_cast<double>(value) / multiplier_; - }; - if (null_count) { - arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), - array->offset(), n); - for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_vec, ++p_values) { - *p_vec = bitmap_reader.IsSet() ? convert(*p_values) : NA_REAL; - } - } else { - std::transform(p_values, p_values + n, p_vec, convert); - } - } - } - - NumericVector data; - int32_t multiplier_; -}; - -template <typename value_type> -struct Converter_TimeStamp : Converter_Time<value_type> { - Converter_TimeStamp(int64_t n, int32_t multiplier) - : Converter_Time<value_type>(n, multiplier, - CharacterVector::create("POSIXct", "POSIXt")) {} -}; - -struct Converter_Int64 { - Converter_Int64(R_xlen_t n) : data(no_init(n)) { data.attr("class") = "integer64"; } - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - if (null_count == n) { - std::fill_n(reinterpret_cast<int64_t*>(data.begin()) + start, n, NA_INT64); - } else { - auto p_values = array->data()->GetValues<int64_t>(1); - STOP_IF_NULL(p_values); - auto p_vec = reinterpret_cast<int64_t*>(data.begin()) + start; - - if (array->null_count()) { - internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), - array->offset(), n); - for (size_t i = 0; i < n; i++, bitmap_reader.Next()) { - p_vec[i] = bitmap_reader.IsNotSet() ? NA_INT64 : p_values[i]; - } - } else { - std::copy_n(p_values, n, p_vec); - } - } - } - - NumericVector data; -}; - -SEXP DictionaryArrays_to_Vector(int64_t n, const ArrayVector& arrays) { - DictionaryArray* dict_array = static_cast<DictionaryArray*>(arrays[0].get()); - auto dict = dict_array->dictionary(); - auto indices = dict_array->indices(); - - if (dict->type_id() != Type::STRING) { - stop("Cannot convert Dictionary Array of type `%s` to R", - dict_array->type()->ToString()); - } - bool ordered = dict_array->dict_type()->ordered(); - switch (indices->type_id()) { - case Type::UINT8: - return ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::UInt8Type>>( - n, arrays, dict, ordered); - - case Type::INT8: - return ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::Int8Type>>( - n, arrays, dict, ordered); - - case Type::UINT16: - return ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::UInt16Type>>( - n, arrays, dict, ordered); - - case Type::INT16: - return ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::Int16Type>>( - n, arrays, dict, ordered); - - case Type::INT32: - return ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::Int32Type>>( - n, arrays, dict, ordered); - - default: - stop("Cannot convert Dictionary Array of type `%s` to R", - dict_array->type()->ToString()); - } - return R_NilValue; -} - -SEXP Date32ArrayVector_to_Vector(int64_t n, const ArrayVector& arrays) { - IntegerVector out( - arrow::r::ArrayVector_To_Vector<Converter_SimpleArray<INTSXP>>(n, arrays)); - out.attr("class") = "Date"; - return out; -} - -struct Converter_Decimal { - Converter_Decimal(R_xlen_t n) : data(no_init(n)) {} - - void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, R_xlen_t n) { - auto null_count = array->null_count(); - if (n == null_count) { - std::fill_n(data.begin() + start, n, NA_REAL); - } else { - auto p_vec = reinterpret_cast<double*>(data.begin()) + start; - const auto& decimals_arr = - internal::checked_cast<const arrow::Decimal128Array&>(*array); - - if (array->null_count()) { - internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), - array->offset(), n); - - for (size_t i = 0; i < n; i++, bitmap_reader.Next()) { - p_vec[i] = bitmap_reader.IsNotSet() - ? NA_REAL - : std::stod(decimals_arr.FormatValue(i).c_str()); - } - } else { - for (size_t i = 0; i < n; i++) { - p_vec[i] = std::stod(decimals_arr.FormatValue(i).c_str()); - } - } - } - } - - NumericVector data; -}; - -} // namespace r -} // namespace arrow - -SEXP ArrayVector__as_vector(int64_t n, const ArrayVector& arrays) { - using namespace arrow::r; - - switch (arrays[0]->type_id()) { - // direct support - case Type::INT8: - return ArrayVector_To_Vector<Converter_SimpleArray<RAWSXP>>(n, arrays); - case Type::INT32: - return ArrayVector_To_Vector<Converter_SimpleArray<INTSXP>>(n, arrays); - case Type::DOUBLE: - return ArrayVector_To_Vector<Converter_SimpleArray<REALSXP>>(n, arrays); - - // need to handle 1-bit case - case Type::BOOL: - return ArrayVector_To_Vector<Converter_Boolean>(n, arrays); - - // handle memory dense strings - case Type::STRING: - return ArrayVector_To_Vector<Converter_String>(n, arrays); - case Type::DICTIONARY: - return DictionaryArrays_to_Vector(n, arrays); - - case Type::DATE32: - return Date32ArrayVector_to_Vector(n, arrays); - case Type::DATE64: - return ArrayVector_To_Vector<Converter_Date64>(n, arrays); - - // promotions to integer vector - case Type::UINT8: - return ArrayVector_To_Vector<Converter_Promotion<INTSXP, arrow::UInt8Type>>(n, - arrays); - case Type::INT16: - return ArrayVector_To_Vector<Converter_Promotion<INTSXP, arrow::Int16Type>>(n, - arrays); - case Type::UINT16: - return ArrayVector_To_Vector<Converter_Promotion<INTSXP, arrow::UInt16Type>>( - n, arrays); - - // promotions to numeric vector - case Type::UINT32: - return ArrayVector_To_Vector<Converter_Promotion<REALSXP, arrow::UInt32Type>>( - n, arrays); - case Type::HALF_FLOAT: - return ArrayVector_To_Vector<Converter_Promotion<REALSXP, arrow::HalfFloatType>>( - n, arrays); - case Type::FLOAT: - return ArrayVector_To_Vector<Converter_Promotion<REALSXP, arrow::FloatType>>( - n, arrays); - - // time32 ane time64 - case Type::TIME32: - return ArrayVector_To_Vector<Converter_Time<int32_t>>( - n, arrays, - static_cast<TimeType*>(arrays[0]->type().get())->unit() == TimeUnit::SECOND - ? 1 - : 1000); - - case Type::TIME64: - return ArrayVector_To_Vector<Converter_Time<int64_t>>( - n, arrays, - static_cast<TimeType*>(arrays[0]->type().get())->unit() == TimeUnit::MICRO - ? 1000000 - : 1000000000); - - case Type::TIMESTAMP: - return ArrayVector_To_Vector<Converter_TimeStamp<int64_t>>( - n, arrays, - static_cast<TimeType*>(arrays[0]->type().get())->unit() == TimeUnit::MICRO - ? 1000000 - : 1000000000); - - case Type::INT64: - return ArrayVector_To_Vector<Converter_Int64>(n, arrays); - case Type::DECIMAL: - return ArrayVector_To_Vector<Converter_Decimal>(n, arrays); - - default: - break; - } - - stop(tfm::format("cannot handle Array of type %s", arrays[0]->type()->name())); - return R_NilValue; -} - -// [[Rcpp::export]] -SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) { - return ArrayVector__as_vector(array->length(), {array}); -} - -// [[Rcpp::export]] -SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array) { - return ArrayVector__as_vector(chunked_array->length(), chunked_array->chunks()); -} - // [[Rcpp::export]] std::shared_ptr<arrow::Array> Array__Slice1(const std::shared_ptr<arrow::Array>& array, int offset) { diff --git a/r/src/array__to_vector.cpp b/r/src/array__to_vector.cpp new file mode 100644 index 0000000..c531933 --- /dev/null +++ b/r/src/array__to_vector.cpp @@ -0,0 +1,697 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/util/parallel.h> +#include <arrow/util/task-group.h> +#include "arrow_types.h" + +using namespace Rcpp; +using namespace arrow; + +namespace arrow { +namespace r { + +class Converter { + public: + Converter(const ArrayVector& arrays) : arrays_(arrays) {} + + virtual ~Converter() {} + + // Allocate a vector of the right R type for this converter + virtual SEXP Allocate(R_xlen_t n) const = 0; + + // data[ start:(start + n) ] = NA + virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const = 0; + + // ingest the values from the array into data[ start : (start + n)] + virtual Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const = 0; + + // ingest one array + Status IngestOne(SEXP data, const std::shared_ptr<arrow::Array>& array, R_xlen_t start, + R_xlen_t n) const { + if (array->null_count() == n) { + return Ingest_all_nulls(data, start, n); + } else { + return Ingest_some_nulls(data, array, start, n); + } + } + + // can this run in parallel ? + virtual bool Parallel() const { return true; } + + // Ingest all the arrays serially + Status IngestSerial(SEXP data) { + R_xlen_t k = 0; + for (const auto& array : arrays_) { + auto n_chunk = array->length(); + RETURN_NOT_OK(IngestOne(data, array, k, n_chunk)); + k += n_chunk; + } + return Status::OK(); + } + + // ingest the arrays in parallel + // + // for each array, add a task to the task group + // + // The task group is Finish() iun the caller + void IngestParallel(SEXP data, const std::shared_ptr<arrow::internal::TaskGroup>& tg) { + R_xlen_t k = 0; + for (const auto& array : arrays_) { + auto n_chunk = array->length(); + tg->Append([=] { return IngestOne(data, array, k, n_chunk); }); + k += n_chunk; + } + } + + // Converter factory + static std::shared_ptr<Converter> Make(const ArrayVector& arrays); + + protected: + const ArrayVector& arrays_; +}; + +// data[start:(start+n)] = NA +template <int RTYPE> +Status AllNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n) { + auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start; + std::fill_n(p_data, n, default_value<RTYPE>()); + return Status::OK(); +} + +// ingest the data from `array` into a slice of `data` +// +// each element goes through `lambda` when some conversion is needed +template <int RTYPE, typename array_value_type, typename Lambda> +Status SomeNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n, + const array_value_type* p_values, + const std::shared_ptr<arrow::Array>& array, Lambda lambda) { + if (!p_values) { + return Status::Invalid("Invalid data buffer"); + } + auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start; + + if (array->null_count()) { + arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), + array->offset(), n); + for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data, ++p_values) { + *p_data = bitmap_reader.IsSet() ? lambda(*p_values) : default_value<RTYPE>(); + } + } else { + std::transform(p_values, p_values + n, p_data, lambda); + } + + return Status::OK(); +} + +// Allocate + Ingest +SEXP ArrayVector__as_vector(R_xlen_t n, const ArrayVector& arrays) { + auto converter = Converter::Make(arrays); + Shield<SEXP> data(converter->Allocate(n)); + STOP_IF_NOT_OK(converter->IngestSerial(data)); + return data; +} + +template <int RTYPE> +class Converter_SimpleArray : public Converter { + using Vector = Rcpp::Vector<RTYPE, Rcpp::NoProtectStorage>; + using value_type = typename Vector::stored_type; + + public: + Converter_SimpleArray(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { return Vector(no_init(n)); } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<RTYPE>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto p_values = array->data()->GetValues<value_type>(1); + auto echo = [](value_type value) { return value; }; + return SomeNull_Ingest<RTYPE, value_type>(data, start, n, p_values, array, echo); + } +}; + +class Converter_Date32 : public Converter_SimpleArray<INTSXP> { + public: + Converter_Date32(const ArrayVector& arrays) : Converter_SimpleArray<INTSXP>(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + IntegerVector data(no_init(n)); + data.attr("class") = "Date"; + return data; + } +}; + +struct Converter_String : public Converter { + public: + Converter_String(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { return StringVector_(no_init(n)); } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<STRSXP>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto p_offset = array->data()->GetValues<int32_t>(1); + if (!p_offset) { + return Status::Invalid("Invalid offset buffer"); + } + auto p_strings = array->data()->GetValues<char>(2, *p_offset); + if (!p_strings) { + // There is an offset buffer, but the data buffer is null + // There is at least one value in the array and not all the values are null + // That means all values are either empty strings or nulls so there is nothing to do + + if (array->null_count()) { + arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), + array->offset(), n); + for (int i = 0; i < n; i++, null_reader.Next()) { + if (null_reader.IsNotSet()) { + SET_STRING_ELT(data, start + i, NA_STRING); + } + } + } + return Status::OK(); + } + + arrow::StringArray* string_array = static_cast<arrow::StringArray*>(array.get()); + if (array->null_count()) { + // need to watch for nulls + arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), + array->offset(), n); + for (int i = 0; i < n; i++, null_reader.Next()) { + if (null_reader.IsSet()) { + SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i))); + } else { + SET_STRING_ELT(data, start + i, NA_STRING); + } + } + + } else { + for (int i = 0; i < n; i++) { + SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i))); + } + } + + return Status::OK(); + } + + bool Parallel() const { return false; } + + inline SEXP r_string(const arrow::util::string_view& view) const { + return Rf_mkCharLenCE(view.data(), view.size(), CE_UTF8); + } +}; + +class Converter_Boolean : public Converter { + public: + Converter_Boolean(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { return LogicalVector_(no_init(n)); } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<LGLSXP>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto p_data = Rcpp::internal::r_vector_start<LGLSXP>(data) + start; + auto p_bools = array->data()->GetValues<uint8_t>(1, 0); + if (!p_bools) { + return Status::Invalid("Invalid data buffer"); + } + + arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n); + if (array->null_count()) { + arrow::internal::BitmapReader null_reader(array->null_bitmap()->data(), + array->offset(), n); + + for (size_t i = 0; i < n; i++, data_reader.Next(), null_reader.Next(), ++p_data) { + *p_data = null_reader.IsSet() ? data_reader.IsSet() : NA_LOGICAL; + } + } else { + for (size_t i = 0; i < n; i++, data_reader.Next(), ++p_data) { + *p_data = data_reader.IsSet(); + } + } + + return Status::OK(); + } +}; + +class Converter_Dictionary : public Converter { + public: + Converter_Dictionary(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + IntegerVector data(no_init(n)); + auto dict_array = static_cast<DictionaryArray*>(Converter::arrays_[0].get()); + auto dict = dict_array->dictionary(); + auto indices = dict_array->indices(); + switch (indices->type_id()) { + case Type::UINT8: + case Type::INT8: + case Type::UINT16: + case Type::INT16: + case Type::INT32: + break; + default: + stop("Cannot convert Dictionary Array of type `%s` to R", + dict_array->type()->ToString()); + } + + if (dict->type_id() != Type::STRING) { + stop("Cannot convert Dictionary Array of type `%s` to R", + dict_array->type()->ToString()); + } + bool ordered = dict_array->dict_type()->ordered(); + + data.attr("levels") = ArrayVector__as_vector(dict->length(), {dict}); + if (ordered) { + data.attr("class") = CharacterVector::create("ordered", "factor"); + } else { + data.attr("class") = "factor"; + } + return data; + } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<INTSXP>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + DictionaryArray* dict_array = static_cast<DictionaryArray*>(array.get()); + auto indices = dict_array->indices(); + switch (indices->type_id()) { + case Type::UINT8: + return Ingest_some_nulls_Impl<arrow::UInt8Type>(data, array, start, n); + case Type::INT8: + return Ingest_some_nulls_Impl<arrow::Int8Type>(data, array, start, n); + case Type::UINT16: + return Ingest_some_nulls_Impl<arrow::UInt16Type>(data, array, start, n); + case Type::INT16: + return Ingest_some_nulls_Impl<arrow::Int16Type>(data, array, start, n); + case Type::INT32: + return Ingest_some_nulls_Impl<arrow::Int32Type>(data, array, start, n); + default: + break; + } + return Status::OK(); + } + + private: + template <typename Type> + Status Ingest_some_nulls_Impl(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; + + std::shared_ptr<Array> indices = + static_cast<DictionaryArray*>(array.get())->indices(); + + // convert the 0-based indices from the arrow Array + // to 1-based indices used in R factors + auto to_r_index = [](value_type value) { return static_cast<int>(value) + 1; }; + + return SomeNull_Ingest<INTSXP, value_type>( + data, start, n, indices->data()->GetValues<value_type>(1), indices, to_r_index); + } +}; + +double ms_to_seconds(int64_t ms) { return static_cast<double>(ms / 1000); } + +class Converter_Date64 : public Converter { + public: + Converter_Date64(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + NumericVector data(no_init(n)); + data.attr("class") = CharacterVector::create("POSIXct", "POSIXt"); + return data; + } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<REALSXP>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto convert = [](int64_t ms) { return static_cast<double>(ms / 1000); }; + return SomeNull_Ingest<REALSXP, int64_t>( + data, start, n, array->data()->GetValues<int64_t>(1), array, convert); + } +}; + +template <int RTYPE, typename Type> +class Converter_Promotion : public Converter { + using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type; + using value_type = typename TypeTraits<Type>::ArrayType::value_type; + + public: + Converter_Promotion(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + return Rcpp::Vector<RTYPE, NoProtectStorage>(no_init(n)); + } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<RTYPE>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto convert = [](value_type value) { return static_cast<r_stored_type>(value); }; + return SomeNull_Ingest<RTYPE, value_type>( + data, start, n, array->data()->GetValues<value_type>(1), array, convert); + } + + private: + static r_stored_type value_convert(value_type value) { + return static_cast<r_stored_type>(value); + } +}; + +template <typename value_type> +class Converter_Time : public Converter { + public: + Converter_Time(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + NumericVector data(no_init(n)); + data.attr("class") = CharacterVector::create("hms", "difftime"); + data.attr("units") = CharacterVector::create("secs"); + return data; + } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<REALSXP>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + int multiplier = TimeUnit_multiplier(array); + auto convert = [=](value_type value) { + return static_cast<double>(value) / multiplier; + }; + return SomeNull_Ingest<REALSXP, value_type>( + data, start, n, array->data()->GetValues<value_type>(1), array, convert); + } + + private: + int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const { + switch (static_cast<TimeType*>(array->type().get())->unit()) { + case TimeUnit::SECOND: + return 1; + case TimeUnit::MILLI: + return 1000; + case TimeUnit::MICRO: + return 1000000; + case TimeUnit::NANO: + return 1000000000; + } + } +}; + +template <typename value_type> +class Converter_Timestamp : public Converter_Time<value_type> { + public: + Converter_Timestamp(const ArrayVector& arrays) : Converter_Time<value_type>(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + NumericVector data(no_init(n)); + data.attr("class") = CharacterVector::create("POSIXct", "POSIXt"); + return data; + } +}; + +class Converter_Decimal : public Converter { + public: + Converter_Decimal(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { return NumericVector_(no_init(n)); } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + return AllNull_Ingest<REALSXP>(data, start, n); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto p_data = Rcpp::internal::r_vector_start<REALSXP>(data) + start; + const auto& decimals_arr = + internal::checked_cast<const arrow::Decimal128Array&>(*array); + + internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), + n); + + for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { + *p_data = bitmap_reader.IsSet() ? std::stod(decimals_arr.FormatValue(i).c_str()) + : NA_REAL; + } + + return Status::OK(); + } +}; + +class Converter_Int64 : public Converter { + public: + Converter_Int64(const ArrayVector& arrays) : Converter(arrays) {} + + SEXP Allocate(R_xlen_t n) const { + NumericVector data(no_init(n)); + data.attr("class") = "integer64"; + return data; + } + + Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { + auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; + std::fill_n(p_data, n, NA_INT64); + return Status::OK(); + } + + Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, + R_xlen_t start, R_xlen_t n) const { + auto p_values = array->data()->GetValues<int64_t>(1); + if (!p_values) { + return Status::Invalid("Invalid data buffer"); + } + + auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; + + if (array->null_count()) { + internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), + n); + for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { + *p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64; + } + } else { + std::copy_n(p_values, n, p_data); + } + + return Status::OK(); + } +}; + +std::shared_ptr<Converter> Converter::Make(const ArrayVector& arrays) { + using namespace arrow::r; + + switch (arrays[0]->type_id()) { + // direct support + case Type::INT8: + return std::make_shared<Converter_SimpleArray<RAWSXP>>(arrays); + + case Type::INT32: + return std::make_shared<Converter_SimpleArray<INTSXP>>(arrays); + + case Type::DOUBLE: + return std::make_shared<Converter_SimpleArray<REALSXP>>(arrays); + + // need to handle 1-bit case + case Type::BOOL: + return std::make_shared<Converter_Boolean>(arrays); + + // handle memory dense strings + case Type::STRING: + return std::make_shared<Converter_String>(arrays); + + case Type::DICTIONARY: + return std::make_shared<Converter_Dictionary>(arrays); + + case Type::DATE32: + return std::make_shared<Converter_Date32>(arrays); + + case Type::DATE64: + return std::make_shared<Converter_Date64>(arrays); + + // promotions to integer vector + case Type::UINT8: + return std::make_shared<Converter_Promotion<INTSXP, arrow::UInt8Type>>(arrays); + + case Type::INT16: + return std::make_shared<Converter_Promotion<INTSXP, arrow::Int16Type>>(arrays); + + case Type::UINT16: + return std::make_shared<Converter_Promotion<INTSXP, arrow::UInt16Type>>(arrays); + + // promotions to numeric vector + case Type::UINT32: + return std::make_shared<Converter_Promotion<REALSXP, arrow::UInt32Type>>(arrays); + + case Type::HALF_FLOAT: + return std::make_shared<Converter_Promotion<REALSXP, arrow::HalfFloatType>>(arrays); + + case Type::FLOAT: + return std::make_shared<Converter_Promotion<REALSXP, arrow::FloatType>>(arrays); + + // time32 ane time64 + case Type::TIME32: + return std::make_shared<Converter_Time<int32_t>>(arrays); + + case Type::TIME64: + return std::make_shared<Converter_Time<int64_t>>(arrays); + + case Type::TIMESTAMP: + return std::make_shared<Converter_Timestamp<int64_t>>(arrays); + + case Type::INT64: + return std::make_shared<Converter_Int64>(arrays); + + case Type::DECIMAL: + return std::make_shared<Converter_Decimal>(arrays); + + default: + break; + } + + stop(tfm::format("cannot handle Array of type %s", arrays[0]->type()->name())); + return nullptr; +} + +List to_dataframe_serial(int64_t nr, int64_t nc, const CharacterVector& names, + const std::vector<std::shared_ptr<Converter>>& converters) { + List tbl(nc); + + for (int i = 0; i < nc; i++) { + SEXP column = tbl[i] = converters[i]->Allocate(nr); + STOP_IF_NOT_OK(converters[i]->IngestSerial(column)); + } + tbl.attr("names") = names; + tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame"); + tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr); + return tbl; +} + +List to_dataframe_parallel(int64_t nr, int64_t nc, const CharacterVector& names, + const std::vector<std::shared_ptr<Converter>>& converters) { + List tbl(nc); + + // task group to ingest data in parallel + auto tg = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); + + // allocate and start ingesting immediately the columns that + // can be ingested in parallel, i.e. when ingestion no longer + // need to happen on the main thread + for (int i = 0; i < nc; i++) { + // allocate data for column i + SEXP column = tbl[i] = converters[i]->Allocate(nr); + + // add a task to ingest data of that column if that can be done in parallel + if (converters[i]->Parallel()) { + converters[i]->IngestParallel(column, tg); + } + } + + arrow::Status status = arrow::Status::OK(); + + // ingest the columns that cannot be dealt with in parallel + for (int i = 0; i < nc; i++) { + if (!converters[i]->Parallel()) { + status &= converters[i]->IngestSerial(tbl[i]); + } + } + + // wait for the ingestion to be finished + status &= tg->Finish(); + + STOP_IF_NOT_OK(status); + + tbl.attr("names") = names; + tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame"); + tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr); + + return tbl; +} + +} // namespace r +} // namespace arrow + +// [[Rcpp::export]] +SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) { + return arrow::r::ArrayVector__as_vector(array->length(), {array}); +} + +// [[Rcpp::export]] +SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array) { + return arrow::r::ArrayVector__as_vector(chunked_array->length(), + chunked_array->chunks()); +} + +// [[Rcpp::export]] +List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch, + bool use_threads) { + int64_t nc = batch->num_columns(); + int64_t nr = batch->num_rows(); + CharacterVector names(nc); + std::vector<ArrayVector> arrays(nc); + std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc); + + for (int64_t i = 0; i < nc; i++) { + names[i] = batch->column_name(i); + arrays[i] = {batch->column(i)}; + converters[i] = arrow::r::Converter::Make(arrays[i]); + } + + if (use_threads) { + return arrow::r::to_dataframe_parallel(nr, nc, names, converters); + } else { + return arrow::r::to_dataframe_serial(nr, nc, names, converters); + } +} + +// [[Rcpp::export]] +List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, bool use_threads) { + int64_t nc = table->num_columns(); + int64_t nr = table->num_rows(); + CharacterVector names(nc); + std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc); + + for (int64_t i = 0; i < nc; i++) { + converters[i] = arrow::r::Converter::Make(table->column(i)->data()->chunks()); + names[i] = table->column(i)->name(); + } + + if (use_threads) { + return arrow::r::to_dataframe_parallel(nr, nc, names, converters); + } else { + return arrow::r::to_dataframe_serial(nr, nc, names, converters); + } +} diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 6fef799..a657731 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -52,6 +52,8 @@ namespace r { struct symbols { static SEXP units; static SEXP xp; + static SEXP dot_Internal; + static SEXP inspect; }; } // namespace r } // namespace arrow @@ -148,6 +150,7 @@ inline SEXP wrap_dispatch(const T& x, Rcpp::traits::wrap_type_unique_ptr_tag) { } // namespace Rcpp namespace Rcpp { +using NumericVector_ = Rcpp::Vector<REALSXP, Rcpp::NoProtectStorage>; using IntegerVector_ = Rcpp::Vector<INTSXP, Rcpp::NoProtectStorage>; using LogicalVector_ = Rcpp::Vector<LGLSXP, Rcpp::NoProtectStorage>; using StringVector_ = Rcpp::Vector<STRSXP, Rcpp::NoProtectStorage>; @@ -156,11 +159,11 @@ using RawVector_ = Rcpp::Vector<RAWSXP, Rcpp::NoProtectStorage>; using List_ = Rcpp::Vector<VECSXP, Rcpp::NoProtectStorage>; template <int RTYPE> -inline typename Rcpp::Vector<RTYPE>::stored_type default_value() { +inline constexpr typename Rcpp::Vector<RTYPE>::stored_type default_value() { return Rcpp::Vector<RTYPE>::get_na(); } template <> -inline Rbyte default_value<RAWSXP>() { +inline constexpr Rbyte default_value<RAWSXP>() { return 0; } @@ -174,6 +177,11 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame namespace arrow { namespace r { +void inspect(SEXP obj); + +// the integer64 sentinel +constexpr int64_t NA_INT64 = std::numeric_limits<int64_t>::min(); + template <int RTYPE, typename Vec = Rcpp::Vector<RTYPE>> class RBuffer : public MutableBuffer { public: diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index b6bee7a..b776d2a 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -58,22 +58,6 @@ std::shared_ptr<arrow::Array> RecordBatch__column( } // [[Rcpp::export]] -List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch) { - int nc = batch->num_columns(); - int nr = batch->num_rows(); - List tbl(nc); - CharacterVector names(nc); - for (int i = 0; i < nc; i++) { - tbl[i] = Array__as_vector(batch->column(i)); - names[i] = batch->column_name(i); - } - tbl.attr("names") = names; - tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame"); - tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr); - return tbl; -} - -// [[Rcpp::export]] std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl) { CharacterVector names = tbl.names(); diff --git a/r/src/symbols.cpp b/r/src/symbols.cpp index e60bcce..5b4e44e 100644 --- a/r/src/symbols.cpp +++ b/r/src/symbols.cpp @@ -21,5 +21,14 @@ namespace arrow { namespace r { SEXP symbols::units = Rf_install("units"); SEXP symbols::xp = Rf_install(".:xp:."); +SEXP symbols::dot_Internal = Rf_install(".Internal"); +SEXP symbols::inspect = Rf_install("inspect"); + +void inspect(SEXP obj) { + Rcpp::Shield<SEXP> call_inspect(Rf_lang2(symbols::inspect, obj)); + Rcpp::Shield<SEXP> call_internal(Rf_lang2(symbols::dot_Internal, call_inspect)); + Rf_eval(call_internal, R_GlobalEnv); +} + } // namespace r } // namespace arrow diff --git a/r/src/table.cpp b/r/src/table.cpp index f4ebd04..fcf2a03 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -46,23 +46,6 @@ std::shared_ptr<arrow::Schema> Table__schema(const std::shared_ptr<arrow::Table> } // [[Rcpp::export]] -List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table) { - int nc = table->num_columns(); - int nr = table->num_rows(); - List tbl(nc); - CharacterVector names(nc); - for (int i = 0; i < nc; i++) { - auto column = table->column(i); - tbl[i] = ChunkedArray__as_vector(column->data()); - names[i] = column->name(); - } - tbl.attr("names") = names; - tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame"); - tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr); - return tbl; -} - -// [[Rcpp::export]] std::shared_ptr<arrow::Column> Table__column(const std::shared_ptr<arrow::Table>& table, int i) { return table->column(i); diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp new file mode 100644 index 0000000..1ce0451 --- /dev/null +++ b/r/src/threadpool.cpp @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/util/parallel.h> +#include "arrow_types.h" + +//' Get the capacity of the global thread pool +//' +//' @return the number of worker threads in the thread pool to which +//' Arrow dispatches various CPU-bound tasks. This is an ideal number, +//' not necessarily the exact number of threads at a given point in time. +//' +//' You can change this number using [SetCpuThreadPoolCapacity()]. +//' +//' @export +// [[Rcpp::export]] +int GetCpuThreadPoolCapacity() { return arrow::GetCpuThreadPoolCapacity(); } + +//' Set the capacity of the global thread pool +//' +//' @param threads the number of worker threads int the thread pool to which +//' Arrow dispatches various CPU-bound tasks. +//' +//' The current number is returned by [GetCpuThreadPoolCapacity()] +//' +//' @export +// [[Rcpp::export]] +void SetCpuThreadPoolCapacity(int threads) { + STOP_IF_NOT_OK(arrow::SetCpuThreadPoolCapacity(threads)); +} diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R index f40bd83..29f9094 100644 --- a/r/tests/testthat/test-RecordBatch.R +++ b/r/tests/testthat/test-RecordBatch.R @@ -69,7 +69,6 @@ test_that("RecordBatch", { expect_equal(col_fct$as_vector(), tbl$fct) expect_equal(col_fct$type, dictionary(int32(), array(letters[1:10]))) - batch2 <- batch$RemoveColumn(0) expect_equal( batch2$schema, diff --git a/r/R/parquet.R b/r/tests/testthat/test-cputhreadpoolcapacity.R similarity index 63% copy from r/R/parquet.R copy to r/tests/testthat/test-cputhreadpoolcapacity.R index 141da7b..de23f15 100644 --- a/r/R/parquet.R +++ b/r/tests/testthat/test-cputhreadpoolcapacity.R @@ -15,19 +15,12 @@ # specific language governing permissions and limitations # under the License. -#' Read parquet file from disk -#' -#' @param file a file path -#' @param as_tibble should the [arrow::Table][arrow__Table] be converted to a tibble. -#' @param ... currently ignored -#' -#' @return a [arrow::Table][arrow__Table], or a data frame if `as_tibble` is `TRUE`. -#' -#' @export -read_parquet <- function(file, as_tibble = TRUE, ...) { - tab <- shared_ptr(`arrow::Table`, read_parquet_file(f)) - if (isTRUE(as_tibble)) { - tab <- as_tibble(tab) - } - tab -} +context("CpuThreadPoolCapacity") + +test_that("can set/get cpu thread pool capacity", { + old <- GetCpuThreadPoolCapacity() + SetCpuThreadPoolCapacity(19L) + expect_equal(GetCpuThreadPoolCapacity(), 19L) + SetCpuThreadPoolCapacity(old) + expect_equal(GetCpuThreadPoolCapacity(), old) +})