This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b61349  ARROW-2968: [R] Multi-threaded conversion from Arrow table to 
R data.frame
3b61349 is described below

commit 3b61349b3c16d43003e493c7e2aec9348e7e7343
Author: Romain Francois <rom...@purrple.cat>
AuthorDate: Wed Jan 9 22:00:12 2019 -0600

    ARROW-2968: [R] Multi-threaded conversion from Arrow table to R data.frame
    
    The `as_tibble()` methods for `arrow::RecordBatch` and `arrow::Table` 
gained a `use_threads` argument. When set to `TRUE` columns of a record batch 
or table are converted to R vectors in parallel.
    
    We cannot allocate R data structures in parallel (including scalar 
strings), so it goes like this:
    
    ```
    for each column:
      - allocate the R vector host for the array
      - if that can be done in parallel, fill the R vector with data from the 
array
    
    fill serially all columns that could not be filled in parallel
    
    wait for all columns to be full
    ```
    
    This is I believe better (although perhaps harder to explain) than
      - allocate all the vectors
      - fill them in parallel
    Because we don't have to wait for all the vectors to be allocated to start 
filling them.
    
    I believe the python does that, in `DataFrameBlockCreator::Convert`
    
    ```
        RETURN_NOT_OK(CreateBlocks());
        RETURN_NOT_OK(WriteTableToBlocks());
    ```
    
    I've had to split the implementation of `Array__as_vector` into two steps:
    
     - Allocate: this must happen on the main thread, or alternatively would 
need to mutex R
     - Ingest: For most array types, this can be done in parallel
    
    Author: Romain Francois <rom...@purrple.cat>
    
    Closes #3332 from romainfrancois/2968/threads and squashes the following 
commits:
    
    8261f2907 <Romain Francois> sprinkle use_threads in functions that call 
as_tibble()
    3205de2d8 <Romain Francois> lint
    590baf5a6 <Romain Francois> using string_view
    cd0dd343e <Romain Francois> no need for checkBuffers
    29546cd5d <Romain Francois> Some more refactoring of the Converters
    5557b7974 <Romain Francois> refactor the Converter api, so that all 
Converters are implementations of the base class Converter.
    e2ed26b78 <Romain Francois> lint
    2a5815e03 <Romain Francois> moving parallel_ingest() to a static method of 
the Converter classes
    2613d4ec4 <Romain Francois> null_count already local variable
    62a842054 <Romain Francois> + to_r_index lambda, with comment about why +1
    52c725fc8 <Romain Francois> default_value() marked constexpr
    11e82e769 <Romain Francois> lint
    d22b9c551 <Romain Francois> parallel version of Table__to_dataframe
    2455bd057 <Romain Francois> parallel version of RecordBatch__to_dataframe
    380d3a5bc <Romain Francois> simplify ArrayVector__as_vector.
    85881a3e2 <Romain Francois> simplify ArrayVector_To_Vector
    7074b36e9 <Romain Francois> reinstate Converter_Timestamp so that 
ArrayVector__as_vector can be simplified
    cf7e76bae <Romain Francois> + parallel_ingest<Converter>() to indicate if 
ingest for a givne converter can be doine in parallel
    baaaefe1b <Romain Francois> Re"work Converter api
    e650b7934 <Romain Francois> + arrow::r::inspect(SEXP) for debugging
    a335dfdfc <Romain Francois> Factor out Array -> R vector code in separate 
file
    1212e28a9 <Romain Francois> <Converter>.Ingest() return an Invalid status 
instead of throwing an exception
    39bf76403 <Romain Francois> <Converter>.Ingest() return a Status instead of 
void
    f68b79376 <Romain Francois> replaced DictionaryArrays_to_Vector and 
Converter_Dictionary_Int32Indices by Converter_Dictionary
    d25a0e6b5 <Romain Francois> replace Date32ArrayVector_to_Vector by 
Converter_Date32
    85e48c0c7 <Romain Francois> lint
    18b921e6f <Romain Francois> + Get/Set ThreadPoolCapacity
---
 r/NAMESPACE                                        |   2 +
 r/R/RcppExports.R                                  |  57 +-
 r/R/RecordBatch.R                                  |   4 +-
 r/R/Table.R                                        |   4 +-
 r/R/feather.R                                      |   5 +-
 r/R/parquet.R                                      |   5 +-
 r/R/read_table.R                                   |   4 +-
 r/man/GetCpuThreadPoolCapacity.Rd                  |  18 +
 r/man/SetCpuThreadPoolCapacity.Rd                  |  17 +
 r/man/read_feather.Rd                              |   5 +-
 r/man/read_parquet.Rd                              |   4 +-
 r/man/read_table.Rd                                |   4 +-
 r/src/RcppExports.cpp                              | 120 ++--
 r/src/array.cpp                                    | 496 ---------------
 r/src/array__to_vector.cpp                         | 697 +++++++++++++++++++++
 r/src/arrow_types.h                                |  12 +-
 r/src/recordbatch.cpp                              |  16 -
 r/src/symbols.cpp                                  |   9 +
 r/src/table.cpp                                    |  17 -
 r/src/threadpool.cpp                               |  44 ++
 r/tests/testthat/test-RecordBatch.R                |   1 -
 .../testthat/test-cputhreadpoolcapacity.R}         |  25 +-
 22 files changed, 942 insertions(+), 624 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index f8f6384..7fd76c7 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -80,6 +80,7 @@ export(FeatherTableWriter)
 export(FileMode)
 export(FileOutputStream)
 export(FixedSizeBufferWriter)
+export(GetCpuThreadPoolCapacity)
 export(MessageReader)
 export(MessageType)
 export(MockOutputStream)
@@ -88,6 +89,7 @@ export(RecordBatchFileReader)
 export(RecordBatchFileWriter)
 export(RecordBatchStreamReader)
 export(RecordBatchStreamWriter)
+export(SetCpuThreadPoolCapacity)
 export(StatusCode)
 export(TimeUnit)
 export(Type)
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index c6fe871..51ed4ea 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -5,14 +5,6 @@ Array__from_vector <- function(x) {
     .Call(`_arrow_Array__from_vector`, x)
 }
 
-Array__as_vector <- function(array) {
-    .Call(`_arrow_Array__as_vector`, array)
-}
-
-ChunkedArray__as_vector <- function(chunked_array) {
-    .Call(`_arrow_ChunkedArray__as_vector`, chunked_array)
-}
-
 Array__Slice1 <- function(array, offset) {
     .Call(`_arrow_Array__Slice1`, array, offset)
 }
@@ -81,6 +73,22 @@ DictionaryArray__dictionary <- function(array) {
     .Call(`_arrow_DictionaryArray__dictionary`, array)
 }
 
+Array__as_vector <- function(array) {
+    .Call(`_arrow_Array__as_vector`, array)
+}
+
+ChunkedArray__as_vector <- function(chunked_array) {
+    .Call(`_arrow_ChunkedArray__as_vector`, chunked_array)
+}
+
+RecordBatch__to_dataframe <- function(batch, use_threads) {
+    .Call(`_arrow_RecordBatch__to_dataframe`, batch, use_threads)
+}
+
+Table__to_dataframe <- function(table, use_threads) {
+    .Call(`_arrow_Table__to_dataframe`, table, use_threads)
+}
+
 ArrayData__get_type <- function(x) {
     .Call(`_arrow_ArrayData__get_type`, x)
 }
@@ -661,10 +669,6 @@ RecordBatch__column <- function(batch, i) {
     .Call(`_arrow_RecordBatch__column`, batch, i)
 }
 
-RecordBatch__to_dataframe <- function(batch) {
-    .Call(`_arrow_RecordBatch__to_dataframe`, batch)
-}
-
 RecordBatch__from_dataframe <- function(tbl) {
     .Call(`_arrow_RecordBatch__from_dataframe`, tbl)
 }
@@ -781,10 +785,6 @@ Table__schema <- function(x) {
     .Call(`_arrow_Table__schema`, x)
 }
 
-Table__to_dataframe <- function(table) {
-    .Call(`_arrow_Table__to_dataframe`, table)
-}
-
 Table__column <- function(table, i) {
     .Call(`_arrow_Table__column`, table, i)
 }
@@ -793,3 +793,28 @@ Table__columns <- function(table) {
     .Call(`_arrow_Table__columns`, table)
 }
 
+#' Get the capacity of the global thread pool
+#'
+#' @return the number of worker threads in the thread pool to which
+#' Arrow dispatches various CPU-bound tasks. This is an ideal number,
+#' not necessarily the exact number of threads at a given point in time.
+#'
+#' You can change this number using [SetCpuThreadPoolCapacity()].
+#'
+#' @export
+GetCpuThreadPoolCapacity <- function() {
+    .Call(`_arrow_GetCpuThreadPoolCapacity`)
+}
+
+#' Set the capacity of the global thread pool
+#'
+#' @param threads the number of worker threads int the thread pool to which
+#' Arrow dispatches various CPU-bound tasks.
+#'
+#' The current number is returned by [GetCpuThreadPoolCapacity()]
+#'
+#' @export
+SetCpuThreadPoolCapacity <- function(threads) {
+    invisible(.Call(`_arrow_SetCpuThreadPoolCapacity`, threads))
+}
+
diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index fed10ab..9872117 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -80,8 +80,8 @@
 }
 
 #' @export
-`as_tibble.arrow::RecordBatch` <- function(x, ...){
-  RecordBatch__to_dataframe(x)
+`as_tibble.arrow::RecordBatch` <- function(x, use_threads = TRUE, ...){
+  RecordBatch__to_dataframe(x, use_threads = use_threads)
 }
 
 #' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame
diff --git a/r/R/Table.R b/r/R/Table.R
index 8972634..c39fce2 100644
--- a/r/R/Table.R
+++ b/r/R/Table.R
@@ -61,6 +61,6 @@ table <- function(.data){
 }
 
 #' @export
-`as_tibble.arrow::Table` <- function(x, ...){
-  Table__to_dataframe(x)
+`as_tibble.arrow::Table` <- function(x, use_threads = TRUE, ...){
+  Table__to_dataframe(x, use_threads = use_threads)
 }
diff --git a/r/R/feather.R b/r/R/feather.R
index 0646521..eaeea4c 100644
--- a/r/R/feather.R
+++ b/r/R/feather.R
@@ -154,15 +154,16 @@ FeatherTableReader.fs_path <- function(file, mmap = TRUE, 
...) {
 #' @param file a arrow::ipc::feather::TableReader or whatever the 
[FeatherTableReader()] function can handle
 #' @param columns names if the columns to read. The default `NULL` means all 
columns
 #' @param as_tibble should the [arrow::Table][arrow__Table] be converted to a 
tibble.
+#' @param use_threads Use threads when converting to a tibble.
 #' @param ... additional parameters
 #'
 #' @return a data frame if `as_tibble` is `TRUE` (the default), or a 
[arrow::Table][arrow__Table] otherwise
 #'
 #' @export
-read_feather <- function(file, columns = NULL, as_tibble = TRUE, ...){
+read_feather <- function(file, columns = NULL, as_tibble = TRUE, use_threads = 
TRUE, ...){
   out <- FeatherTableReader(file, ...)$Read(columns)
   if (isTRUE(as_tibble)) {
-    out <- as_tibble(out)
+    out <- as_tibble(out, use_threads = use_threads)
   }
   out
 }
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 141da7b..6a393e2 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -19,15 +19,16 @@
 #'
 #' @param file a file path
 #' @param as_tibble should the [arrow::Table][arrow__Table] be converted to a 
tibble.
+#' @param use_threads Use threads when converting to a tibble, only relevant 
if `as_tibble` is `TRUE`
 #' @param ... currently ignored
 #'
 #' @return a [arrow::Table][arrow__Table], or a data frame if `as_tibble` is 
`TRUE`.
 #'
 #' @export
-read_parquet <- function(file, as_tibble = TRUE, ...) {
+read_parquet <- function(file, as_tibble = TRUE, use_threads = TRUE, ...) {
   tab <- shared_ptr(`arrow::Table`, read_parquet_file(f))
   if (isTRUE(as_tibble)) {
-    tab <- as_tibble(tab)
+    tab <- as_tibble(tab, use_threads = use_threads)
   }
   tab
 }
diff --git a/r/R/read_table.R b/r/R/read_table.R
index a540a42..260c50f 100644
--- a/r/R/read_table.R
+++ b/r/R/read_table.R
@@ -33,6 +33,8 @@
 #'
 #'  - a raw vector: read using a 
[arrow::ipc::RecordBatchStreamReader][arrow__ipc__RecordBatchStreamReader]
 #'
+#' @param use_threads Use threads when converting to a tibble
+#'
 #' @return
 #'
 #'  - `read_table` returns an [arrow::Table][arrow__Table]
@@ -81,6 +83,6 @@ read_table.fs_path <- function(stream) {
 
 #' @rdname read_table
 #' @export
-read_arrow <- function(stream){
+read_arrow <- function(stream, use_threads = TRUE){
   as_tibble(read_table(stream))
 }
diff --git a/r/man/GetCpuThreadPoolCapacity.Rd 
b/r/man/GetCpuThreadPoolCapacity.Rd
new file mode 100644
index 0000000..8bf0a6f
--- /dev/null
+++ b/r/man/GetCpuThreadPoolCapacity.Rd
@@ -0,0 +1,18 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RcppExports.R
+\name{GetCpuThreadPoolCapacity}
+\alias{GetCpuThreadPoolCapacity}
+\title{Get the capacity of the global thread pool}
+\usage{
+GetCpuThreadPoolCapacity()
+}
+\value{
+the number of worker threads in the thread pool to which
+Arrow dispatches various CPU-bound tasks. This is an ideal number,
+not necessarily the exact number of threads at a given point in time.
+
+You can change this number using 
\code{\link[=SetCpuThreadPoolCapacity]{SetCpuThreadPoolCapacity()}}.
+}
+\description{
+Get the capacity of the global thread pool
+}
diff --git a/r/man/SetCpuThreadPoolCapacity.Rd 
b/r/man/SetCpuThreadPoolCapacity.Rd
new file mode 100644
index 0000000..3a06dd5
--- /dev/null
+++ b/r/man/SetCpuThreadPoolCapacity.Rd
@@ -0,0 +1,17 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RcppExports.R
+\name{SetCpuThreadPoolCapacity}
+\alias{SetCpuThreadPoolCapacity}
+\title{Set the capacity of the global thread pool}
+\usage{
+SetCpuThreadPoolCapacity(threads)
+}
+\arguments{
+\item{threads}{the number of worker threads int the thread pool to which
+Arrow dispatches various CPU-bound tasks.
+
+The current number is returned by 
\code{\link[=GetCpuThreadPoolCapacity]{GetCpuThreadPoolCapacity()}}}
+}
+\description{
+Set the capacity of the global thread pool
+}
diff --git a/r/man/read_feather.Rd b/r/man/read_feather.Rd
index 31fd36a..4509c7d 100644
--- a/r/man/read_feather.Rd
+++ b/r/man/read_feather.Rd
@@ -4,7 +4,8 @@
 \alias{read_feather}
 \title{Read a feather file}
 \usage{
-read_feather(file, columns = NULL, as_tibble = TRUE, ...)
+read_feather(file, columns = NULL, as_tibble = TRUE,
+  use_threads = TRUE, ...)
 }
 \arguments{
 \item{file}{a arrow::ipc::feather::TableReader or whatever the 
\code{\link[=FeatherTableReader]{FeatherTableReader()}} function can handle}
@@ -13,6 +14,8 @@ read_feather(file, columns = NULL, as_tibble = TRUE, ...)
 
 \item{as_tibble}{should the \link[=arrow__Table]{arrow::Table} be converted to 
a tibble.}
 
+\item{use_threads}{Use threads when converting to a tibble.}
+
 \item{...}{additional parameters}
 }
 \value{
diff --git a/r/man/read_parquet.Rd b/r/man/read_parquet.Rd
index c29e18b..a4f294b 100644
--- a/r/man/read_parquet.Rd
+++ b/r/man/read_parquet.Rd
@@ -4,13 +4,15 @@
 \alias{read_parquet}
 \title{Read parquet file from disk}
 \usage{
-read_parquet(file, as_tibble = TRUE, ...)
+read_parquet(file, as_tibble = TRUE, use_threads = TRUE, ...)
 }
 \arguments{
 \item{file}{a file path}
 
 \item{as_tibble}{should the \link[=arrow__Table]{arrow::Table} be converted to 
a tibble.}
 
+\item{use_threads}{Use threads when converting to a tibble, only relevant if 
\code{as_tibble} is \code{TRUE}}
+
 \item{...}{currently ignored}
 }
 \value{
diff --git a/r/man/read_table.Rd b/r/man/read_table.Rd
index 3231b26..356ec5e 100644
--- a/r/man/read_table.Rd
+++ b/r/man/read_table.Rd
@@ -7,7 +7,7 @@
 \usage{
 read_table(stream)
 
-read_arrow(stream)
+read_arrow(stream, use_threads = TRUE)
 }
 \arguments{
 \item{stream}{stream.
@@ -23,6 +23,8 @@ binary file format, and uses a 
\link[=arrow__ipc__RecordBatchFileReader]{arrow::
 to process it.
 \item a raw vector: read using a 
\link[=arrow__ipc__RecordBatchStreamReader]{arrow::ipc::RecordBatchStreamReader}
 }}
+
+\item{use_threads}{Use threads when converting to a tibble}
 }
 \value{
 \itemize{
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index 1e8fed1..a31c401 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -17,28 +17,6 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
-// Array__as_vector
-SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
-RcppExport SEXP _arrow_Array__as_vector(SEXP arraySEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Array>& 
>::type array(arraySEXP);
-    rcpp_result_gen = Rcpp::wrap(Array__as_vector(array));
-    return rcpp_result_gen;
-END_RCPP
-}
-// ChunkedArray__as_vector
-SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& 
chunked_array);
-RcppExport SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_arraySEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::ChunkedArray>& 
>::type chunked_array(chunked_arraySEXP);
-    rcpp_result_gen = Rcpp::wrap(ChunkedArray__as_vector(chunked_array));
-    return rcpp_result_gen;
-END_RCPP
-}
 // Array__Slice1
 std::shared_ptr<arrow::Array> Array__Slice1(const 
std::shared_ptr<arrow::Array>& array, int offset);
 RcppExport SEXP _arrow_Array__Slice1(SEXP arraySEXP, SEXP offsetSEXP) {
@@ -237,6 +215,52 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
+// Array__as_vector
+SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
+RcppExport SEXP _arrow_Array__as_vector(SEXP arraySEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Array>& 
>::type array(arraySEXP);
+    rcpp_result_gen = Rcpp::wrap(Array__as_vector(array));
+    return rcpp_result_gen;
+END_RCPP
+}
+// ChunkedArray__as_vector
+SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& 
chunked_array);
+RcppExport SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_arraySEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::ChunkedArray>& 
>::type chunked_array(chunked_arraySEXP);
+    rcpp_result_gen = Rcpp::wrap(ChunkedArray__as_vector(chunked_array));
+    return rcpp_result_gen;
+END_RCPP
+}
+// RecordBatch__to_dataframe
+List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& 
batch, bool use_threads);
+RcppExport SEXP _arrow_RecordBatch__to_dataframe(SEXP batchSEXP, SEXP 
use_threadsSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& 
>::type batch(batchSEXP);
+    Rcpp::traits::input_parameter< bool >::type use_threads(use_threadsSEXP);
+    rcpp_result_gen = Rcpp::wrap(RecordBatch__to_dataframe(batch, 
use_threads));
+    return rcpp_result_gen;
+END_RCPP
+}
+// Table__to_dataframe
+List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, bool 
use_threads);
+RcppExport SEXP _arrow_Table__to_dataframe(SEXP tableSEXP, SEXP 
use_threadsSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type table(tableSEXP);
+    Rcpp::traits::input_parameter< bool >::type use_threads(use_threadsSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__to_dataframe(table, use_threads));
+    return rcpp_result_gen;
+END_RCPP
+}
 // ArrayData__get_type
 std::shared_ptr<arrow::DataType> ArrayData__get_type(const 
std::shared_ptr<arrow::ArrayData>& x);
 RcppExport SEXP _arrow_ArrayData__get_type(SEXP xSEXP) {
@@ -1846,17 +1870,6 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
-// RecordBatch__to_dataframe
-List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& 
batch);
-RcppExport SEXP _arrow_RecordBatch__to_dataframe(SEXP batchSEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& 
>::type batch(batchSEXP);
-    rcpp_result_gen = Rcpp::wrap(RecordBatch__to_dataframe(batch));
-    return rcpp_result_gen;
-END_RCPP
-}
 // RecordBatch__from_dataframe
 std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl);
 RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) {
@@ -2185,17 +2198,6 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__to_dataframe
-List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table);
-RcppExport SEXP _arrow_Table__to_dataframe(SEXP tableSEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type table(tableSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__to_dataframe(table));
-    return rcpp_result_gen;
-END_RCPP
-}
 // Table__column
 std::shared_ptr<arrow::Column> Table__column(const 
std::shared_ptr<arrow::Table>& table, int i);
 RcppExport SEXP _arrow_Table__column(SEXP tableSEXP, SEXP iSEXP) {
@@ -2219,11 +2221,29 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
+// GetCpuThreadPoolCapacity
+int GetCpuThreadPoolCapacity();
+RcppExport SEXP _arrow_GetCpuThreadPoolCapacity() {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    rcpp_result_gen = Rcpp::wrap(GetCpuThreadPoolCapacity());
+    return rcpp_result_gen;
+END_RCPP
+}
+// SetCpuThreadPoolCapacity
+void SetCpuThreadPoolCapacity(int threads);
+RcppExport SEXP _arrow_SetCpuThreadPoolCapacity(SEXP threadsSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< int >::type threads(threadsSEXP);
+    SetCpuThreadPoolCapacity(threads);
+    return R_NilValue;
+END_RCPP
+}
 
 static const R_CallMethodDef CallEntries[] = {
     {"_arrow_Array__from_vector", (DL_FUNC) &_arrow_Array__from_vector, 1},
-    {"_arrow_Array__as_vector", (DL_FUNC) &_arrow_Array__as_vector, 1},
-    {"_arrow_ChunkedArray__as_vector", (DL_FUNC) 
&_arrow_ChunkedArray__as_vector, 1},
     {"_arrow_Array__Slice1", (DL_FUNC) &_arrow_Array__Slice1, 2},
     {"_arrow_Array__Slice2", (DL_FUNC) &_arrow_Array__Slice2, 3},
     {"_arrow_Array__IsNull", (DL_FUNC) &_arrow_Array__IsNull, 2},
@@ -2241,6 +2261,10 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_Array__Mask", (DL_FUNC) &_arrow_Array__Mask, 1},
     {"_arrow_DictionaryArray__indices", (DL_FUNC) 
&_arrow_DictionaryArray__indices, 1},
     {"_arrow_DictionaryArray__dictionary", (DL_FUNC) 
&_arrow_DictionaryArray__dictionary, 1},
+    {"_arrow_Array__as_vector", (DL_FUNC) &_arrow_Array__as_vector, 1},
+    {"_arrow_ChunkedArray__as_vector", (DL_FUNC) 
&_arrow_ChunkedArray__as_vector, 1},
+    {"_arrow_RecordBatch__to_dataframe", (DL_FUNC) 
&_arrow_RecordBatch__to_dataframe, 2},
+    {"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 2},
     {"_arrow_ArrayData__get_type", (DL_FUNC) &_arrow_ArrayData__get_type, 1},
     {"_arrow_ArrayData__get_length", (DL_FUNC) &_arrow_ArrayData__get_length, 
1},
     {"_arrow_ArrayData__get_null_count", (DL_FUNC) 
&_arrow_ArrayData__get_null_count, 1},
@@ -2386,7 +2410,6 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1},
     {"_arrow_RecordBatch__columns", (DL_FUNC) &_arrow_RecordBatch__columns, 1},
     {"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2},
-    {"_arrow_RecordBatch__to_dataframe", (DL_FUNC) 
&_arrow_RecordBatch__to_dataframe, 1},
     {"_arrow_RecordBatch__from_dataframe", (DL_FUNC) 
&_arrow_RecordBatch__from_dataframe, 1},
     {"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2},
     {"_arrow_RecordBatch__RemoveColumn", (DL_FUNC) 
&_arrow_RecordBatch__RemoveColumn, 2},
@@ -2416,9 +2439,10 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1},
     {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1},
     {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1},
-    {"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 1},
     {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2},
     {"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1},
+    {"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) 
&_arrow_GetCpuThreadPoolCapacity, 0},
+    {"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) 
&_arrow_SetCpuThreadPoolCapacity, 1},
     {NULL, NULL, 0}
 };
 
diff --git a/r/src/array.cpp b/r/src/array.cpp
index 901f2b6..dd0d7e6 100644
--- a/r/src/array.cpp
+++ b/r/src/array.cpp
@@ -33,9 +33,6 @@ inline bool isna<REALSXP>(double x) {
   return ISNA(x);
 }
 
-// the integer64 sentinel
-constexpr int64_t NA_INT64 = std::numeric_limits<int64_t>::min();
-
 template <int RTYPE, typename Type>
 std::shared_ptr<Array> SimpleArray(SEXP x) {
   Rcpp::Vector<RTYPE> vec(x);
@@ -503,499 +500,6 @@ std::shared_ptr<arrow::Array> Array__from_vector(SEXP x) {
   return nullptr;
 }
 
-// ---------------------------- Array -> R vector
-
-namespace arrow {
-namespace r {
-
-template <typename Converter, typename... Args>
-SEXP ArrayVector_To_Vector(int64_t n, const ArrayVector& arrays, Args... args) 
{
-  Converter converter(n, std::forward<Args>(args)...);
-
-  R_xlen_t k = 0;
-  for (const auto& array : arrays) {
-    auto n_chunk = array->length();
-    converter.Ingest(array, k, n_chunk);
-    k += n_chunk;
-  }
-  return converter.data;
-}
-
-template <int RTYPE>
-struct Converter_SimpleArray {
-  using Vector = Rcpp::Vector<RTYPE>;
-
-  Converter_SimpleArray(R_xlen_t n) : data(no_init(n)) {}
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    using value_type = typename Vector::stored_type;
-    auto null_count = array->null_count();
-
-    if (n == null_count) {
-      std::fill_n(data.begin() + start, n, default_value<RTYPE>());
-    } else {
-      auto p_values = array->data()->GetValues<value_type>(1);
-      STOP_IF_NULL(p_values);
-
-      // first copy all the data
-      std::copy_n(p_values, n, data.begin() + start);
-
-      if (null_count) {
-        // then set the sentinel NA
-        arrow::internal::BitmapReader 
bitmap_reader(array->null_bitmap()->data(),
-                                                    array->offset(), n);
-
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next()) {
-          if (bitmap_reader.IsNotSet()) {
-            data[i + start] = default_value<RTYPE>();
-          }
-        }
-      }
-    }
-  }
-
-  Vector data;
-};
-
-struct Converter_String {
-  Converter_String(R_xlen_t n) : data(n) {}
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-
-    if (null_count == n) {
-      std::fill_n(data.begin(), n, NA_STRING);
-    } else {
-      auto p_offset = array->data()->GetValues<int32_t>(1);
-      STOP_IF_NULL(p_offset);
-      auto p_data = array->data()->GetValues<char>(2, *p_offset);
-      if (!p_data) {
-        // There is an offset buffer, but the data buffer is null
-        // There is at least one value in the array and not all the values are 
null
-        // That means all values are empty strings so there is nothing to do
-        return;
-      }
-
-      if (null_count) {
-        // need to watch for nulls
-        arrow::internal::BitmapReader null_reader(array->null_bitmap_data(),
-                                                  array->offset(), n);
-        for (int i = 0; i < n; i++, null_reader.Next()) {
-          if (null_reader.IsSet()) {
-            auto diff = p_offset[i + 1] - p_offset[i];
-            SET_STRING_ELT(data, start + i, Rf_mkCharLenCE(p_data, diff, 
CE_UTF8));
-            p_data += diff;
-          } else {
-            SET_STRING_ELT(data, start + i, NA_STRING);
-          }
-        }
-
-      } else {
-        // no need to check for nulls
-        // TODO: altrep mark this as no na
-        for (int i = 0; i < n; i++) {
-          auto diff = p_offset[i + 1] - p_offset[i];
-          SET_STRING_ELT(data, start + i, Rf_mkCharLenCE(p_data, diff, 
CE_UTF8));
-          p_data += diff;
-        }
-      }
-    }
-  }
-
-  CharacterVector data;
-};
-
-struct Converter_Boolean {
-  Converter_Boolean(R_xlen_t n) : data(n) {}
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-
-    if (n == null_count) {
-      std::fill_n(data.begin() + start, n, NA_LOGICAL);
-    } else {
-      // process the data
-      auto p_data = array->data()->GetValues<uint8_t>(1, 0);
-      STOP_IF_NULL(p_data);
-
-      arrow::internal::BitmapReader data_reader(p_data, array->offset(), n);
-      for (size_t i = 0; i < n; i++, data_reader.Next()) {
-        data[start + i] = data_reader.IsSet();
-      }
-
-      // then the null bitmap if needed
-      if (null_count) {
-        arrow::internal::BitmapReader null_reader(array->null_bitmap()->data(),
-                                                  array->offset(), n);
-        for (size_t i = 0; i < n; i++, null_reader.Next()) {
-          if (null_reader.IsNotSet()) {
-            data[start + i] = NA_LOGICAL;
-          }
-        }
-      }
-    }
-  }
-
-  LogicalVector data;
-};
-
-template <typename Type>
-struct Converter_Dictionary_Int32Indices {
-  Converter_Dictionary_Int32Indices(R_xlen_t n, const 
std::shared_ptr<arrow::Array>& dict,
-                                    bool ordered)
-      : data(no_init(n)) {
-    data.attr("levels") = 
ArrayVector_To_Vector<Converter_String>(dict->length(), {dict});
-    if (ordered) {
-      data.attr("class") = CharacterVector::create("ordered", "factor");
-    } else {
-      data.attr("class") = "factor";
-    }
-  }
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    DictionaryArray* dict_array = static_cast<DictionaryArray*>(array.get());
-    using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
-    auto null_count = array->null_count();
-
-    if (n == null_count) {
-      std::fill_n(data.begin() + start, n, NA_INTEGER);
-    } else {
-      std::shared_ptr<Array> indices = dict_array->indices();
-      auto p_array = indices->data()->GetValues<value_type>(1);
-      STOP_IF_NULL(p_array);
-
-      if (array->null_count()) {
-        arrow::internal::BitmapReader 
bitmap_reader(indices->null_bitmap()->data(),
-                                                    indices->offset(), n);
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_array) {
-          data[start + i] =
-              bitmap_reader.IsNotSet() ? NA_INTEGER : 
(static_cast<int>(*p_array) + 1);
-        }
-      } else {
-        std::transform(
-            p_array, p_array + n, data.begin() + start,
-            [](const value_type value) { return static_cast<int>(value) + 1; 
});
-      }
-    }
-  }
-
-  IntegerVector data;
-};
-
-struct Converter_Date64 {
-  Converter_Date64(R_xlen_t n) : data(n) {
-    data.attr("class") = CharacterVector::create("POSIXct", "POSIXt");
-  }
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-    if (null_count == n) {
-      std::fill_n(data.begin() + start, n, NA_REAL);
-    } else {
-      auto p_values = array->data()->GetValues<int64_t>(1);
-      STOP_IF_NULL(p_values);
-      auto p_vec = data.begin() + start;
-
-      // convert DATE64 milliseconds to R seconds (stored as double)
-      auto seconds = [](int64_t ms) { return static_cast<double>(ms / 1000); };
-
-      if (null_count) {
-        arrow::internal::BitmapReader 
bitmap_reader(array->null_bitmap()->data(),
-                                                    array->offset(), n);
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_vec, 
++p_values) {
-          *p_vec = bitmap_reader.IsSet() ? seconds(*p_values) : NA_REAL;
-        }
-      } else {
-        std::transform(p_values, p_values + n, p_vec, seconds);
-      }
-    }
-  }
-
-  NumericVector data;
-};
-
-template <int RTYPE, typename Type>
-struct Converter_Promotion {
-  using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
-  using value_type = typename TypeTraits<Type>::ArrayType::value_type;
-
-  Converter_Promotion(R_xlen_t n) : data(no_init(n)) {}
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-    if (null_count == n) {
-      std::fill_n(data.begin() + start, n, default_value<RTYPE>());
-    } else {
-      auto p_values = array->data()->GetValues<value_type>(1);
-      STOP_IF_NULL(p_values);
-
-      auto value_convert = [](value_type value) {
-        return static_cast<r_stored_type>(value);
-      };
-      if (null_count) {
-        internal::BitmapReader bitmap_reader(array->null_bitmap()->data(),
-                                             array->offset(), n);
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next()) {
-          data[start + i] = bitmap_reader.IsNotSet() ? 
Rcpp::Vector<RTYPE>::get_na()
-                                                     : 
value_convert(p_values[i]);
-        }
-      } else {
-        std::transform(p_values, p_values + n, data.begin(), value_convert);
-      }
-    }
-  }
-
-  Rcpp::Vector<RTYPE> data;
-};
-
-template <typename value_type>
-struct Converter_Time {
-  Converter_Time(int64_t n, int32_t multiplier, CharacterVector classes)
-      : data(no_init(n)), multiplier_(multiplier) {
-    data.attr("class") = classes;
-  }
-
-  Converter_Time(int64_t n, int32_t multiplier)
-      : data(no_init(n)), multiplier_(multiplier) {
-    data.attr("class") = CharacterVector::create("hms", "difftime");
-    data.attr("units") = "secs";
-  }
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-    if (n == null_count) {
-      std::fill_n(data.begin() + start, n, NA_REAL);
-    } else {
-      auto p_values = array->data()->GetValues<value_type>(1);
-      STOP_IF_NULL(p_values);
-      auto p_vec = data.begin() + start;
-      auto convert = [this](value_type value) {
-        return static_cast<double>(value) / multiplier_;
-      };
-      if (null_count) {
-        arrow::internal::BitmapReader 
bitmap_reader(array->null_bitmap()->data(),
-                                                    array->offset(), n);
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_vec, 
++p_values) {
-          *p_vec = bitmap_reader.IsSet() ? convert(*p_values) : NA_REAL;
-        }
-      } else {
-        std::transform(p_values, p_values + n, p_vec, convert);
-      }
-    }
-  }
-
-  NumericVector data;
-  int32_t multiplier_;
-};
-
-template <typename value_type>
-struct Converter_TimeStamp : Converter_Time<value_type> {
-  Converter_TimeStamp(int64_t n, int32_t multiplier)
-      : Converter_Time<value_type>(n, multiplier,
-                                   CharacterVector::create("POSIXct", 
"POSIXt")) {}
-};
-
-struct Converter_Int64 {
-  Converter_Int64(R_xlen_t n) : data(no_init(n)) { data.attr("class") = 
"integer64"; }
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-    if (null_count == n) {
-      std::fill_n(reinterpret_cast<int64_t*>(data.begin()) + start, n, 
NA_INT64);
-    } else {
-      auto p_values = array->data()->GetValues<int64_t>(1);
-      STOP_IF_NULL(p_values);
-      auto p_vec = reinterpret_cast<int64_t*>(data.begin()) + start;
-
-      if (array->null_count()) {
-        internal::BitmapReader bitmap_reader(array->null_bitmap()->data(),
-                                             array->offset(), n);
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next()) {
-          p_vec[i] = bitmap_reader.IsNotSet() ? NA_INT64 : p_values[i];
-        }
-      } else {
-        std::copy_n(p_values, n, p_vec);
-      }
-    }
-  }
-
-  NumericVector data;
-};
-
-SEXP DictionaryArrays_to_Vector(int64_t n, const ArrayVector& arrays) {
-  DictionaryArray* dict_array = static_cast<DictionaryArray*>(arrays[0].get());
-  auto dict = dict_array->dictionary();
-  auto indices = dict_array->indices();
-
-  if (dict->type_id() != Type::STRING) {
-    stop("Cannot convert Dictionary Array of type `%s` to R",
-         dict_array->type()->ToString());
-  }
-  bool ordered = dict_array->dict_type()->ordered();
-  switch (indices->type_id()) {
-    case Type::UINT8:
-      return 
ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::UInt8Type>>(
-          n, arrays, dict, ordered);
-
-    case Type::INT8:
-      return 
ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::Int8Type>>(
-          n, arrays, dict, ordered);
-
-    case Type::UINT16:
-      return 
ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::UInt16Type>>(
-          n, arrays, dict, ordered);
-
-    case Type::INT16:
-      return 
ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::Int16Type>>(
-          n, arrays, dict, ordered);
-
-    case Type::INT32:
-      return 
ArrayVector_To_Vector<Converter_Dictionary_Int32Indices<arrow::Int32Type>>(
-          n, arrays, dict, ordered);
-
-    default:
-      stop("Cannot convert Dictionary Array of type `%s` to R",
-           dict_array->type()->ToString());
-  }
-  return R_NilValue;
-}
-
-SEXP Date32ArrayVector_to_Vector(int64_t n, const ArrayVector& arrays) {
-  IntegerVector out(
-      arrow::r::ArrayVector_To_Vector<Converter_SimpleArray<INTSXP>>(n, 
arrays));
-  out.attr("class") = "Date";
-  return out;
-}
-
-struct Converter_Decimal {
-  Converter_Decimal(R_xlen_t n) : data(no_init(n)) {}
-
-  void Ingest(const std::shared_ptr<arrow::Array>& array, R_xlen_t start, 
R_xlen_t n) {
-    auto null_count = array->null_count();
-    if (n == null_count) {
-      std::fill_n(data.begin() + start, n, NA_REAL);
-    } else {
-      auto p_vec = reinterpret_cast<double*>(data.begin()) + start;
-      const auto& decimals_arr =
-          internal::checked_cast<const arrow::Decimal128Array&>(*array);
-
-      if (array->null_count()) {
-        internal::BitmapReader bitmap_reader(array->null_bitmap()->data(),
-                                             array->offset(), n);
-
-        for (size_t i = 0; i < n; i++, bitmap_reader.Next()) {
-          p_vec[i] = bitmap_reader.IsNotSet()
-                         ? NA_REAL
-                         : std::stod(decimals_arr.FormatValue(i).c_str());
-        }
-      } else {
-        for (size_t i = 0; i < n; i++) {
-          p_vec[i] = std::stod(decimals_arr.FormatValue(i).c_str());
-        }
-      }
-    }
-  }
-
-  NumericVector data;
-};
-
-}  // namespace r
-}  // namespace arrow
-
-SEXP ArrayVector__as_vector(int64_t n, const ArrayVector& arrays) {
-  using namespace arrow::r;
-
-  switch (arrays[0]->type_id()) {
-    // direct support
-    case Type::INT8:
-      return ArrayVector_To_Vector<Converter_SimpleArray<RAWSXP>>(n, arrays);
-    case Type::INT32:
-      return ArrayVector_To_Vector<Converter_SimpleArray<INTSXP>>(n, arrays);
-    case Type::DOUBLE:
-      return ArrayVector_To_Vector<Converter_SimpleArray<REALSXP>>(n, arrays);
-
-    // need to handle 1-bit case
-    case Type::BOOL:
-      return ArrayVector_To_Vector<Converter_Boolean>(n, arrays);
-
-      // handle memory dense strings
-    case Type::STRING:
-      return ArrayVector_To_Vector<Converter_String>(n, arrays);
-    case Type::DICTIONARY:
-      return DictionaryArrays_to_Vector(n, arrays);
-
-    case Type::DATE32:
-      return Date32ArrayVector_to_Vector(n, arrays);
-    case Type::DATE64:
-      return ArrayVector_To_Vector<Converter_Date64>(n, arrays);
-
-      // promotions to integer vector
-    case Type::UINT8:
-      return ArrayVector_To_Vector<Converter_Promotion<INTSXP, 
arrow::UInt8Type>>(n,
-                                                                               
   arrays);
-    case Type::INT16:
-      return ArrayVector_To_Vector<Converter_Promotion<INTSXP, 
arrow::Int16Type>>(n,
-                                                                               
   arrays);
-    case Type::UINT16:
-      return ArrayVector_To_Vector<Converter_Promotion<INTSXP, 
arrow::UInt16Type>>(
-          n, arrays);
-
-      // promotions to numeric vector
-    case Type::UINT32:
-      return ArrayVector_To_Vector<Converter_Promotion<REALSXP, 
arrow::UInt32Type>>(
-          n, arrays);
-    case Type::HALF_FLOAT:
-      return ArrayVector_To_Vector<Converter_Promotion<REALSXP, 
arrow::HalfFloatType>>(
-          n, arrays);
-    case Type::FLOAT:
-      return ArrayVector_To_Vector<Converter_Promotion<REALSXP, 
arrow::FloatType>>(
-          n, arrays);
-
-      // time32 ane time64
-    case Type::TIME32:
-      return ArrayVector_To_Vector<Converter_Time<int32_t>>(
-          n, arrays,
-          static_cast<TimeType*>(arrays[0]->type().get())->unit() == 
TimeUnit::SECOND
-              ? 1
-              : 1000);
-
-    case Type::TIME64:
-      return ArrayVector_To_Vector<Converter_Time<int64_t>>(
-          n, arrays,
-          static_cast<TimeType*>(arrays[0]->type().get())->unit() == 
TimeUnit::MICRO
-              ? 1000000
-              : 1000000000);
-
-    case Type::TIMESTAMP:
-      return ArrayVector_To_Vector<Converter_TimeStamp<int64_t>>(
-          n, arrays,
-          static_cast<TimeType*>(arrays[0]->type().get())->unit() == 
TimeUnit::MICRO
-              ? 1000000
-              : 1000000000);
-
-    case Type::INT64:
-      return ArrayVector_To_Vector<Converter_Int64>(n, arrays);
-    case Type::DECIMAL:
-      return ArrayVector_To_Vector<Converter_Decimal>(n, arrays);
-
-    default:
-      break;
-  }
-
-  stop(tfm::format("cannot handle Array of type %s", 
arrays[0]->type()->name()));
-  return R_NilValue;
-}
-
-// [[Rcpp::export]]
-SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) {
-  return ArrayVector__as_vector(array->length(), {array});
-}
-
-// [[Rcpp::export]]
-SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& 
chunked_array) {
-  return ArrayVector__as_vector(chunked_array->length(), 
chunked_array->chunks());
-}
-
 // [[Rcpp::export]]
 std::shared_ptr<arrow::Array> Array__Slice1(const 
std::shared_ptr<arrow::Array>& array,
                                             int offset) {
diff --git a/r/src/array__to_vector.cpp b/r/src/array__to_vector.cpp
new file mode 100644
index 0000000..c531933
--- /dev/null
+++ b/r/src/array__to_vector.cpp
@@ -0,0 +1,697 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/util/parallel.h>
+#include <arrow/util/task-group.h>
+#include "arrow_types.h"
+
+using namespace Rcpp;
+using namespace arrow;
+
+namespace arrow {
+namespace r {
+
+class Converter {
+ public:
+  Converter(const ArrayVector& arrays) : arrays_(arrays) {}
+
+  virtual ~Converter() {}
+
+  // Allocate a vector of the right R type for this converter
+  virtual SEXP Allocate(R_xlen_t n) const = 0;
+
+  // data[ start:(start + n) ] = NA
+  virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const 
= 0;
+
+  // ingest the values from the array into data[ start : (start + n)]
+  virtual Status Ingest_some_nulls(SEXP data, const 
std::shared_ptr<arrow::Array>& array,
+                                   R_xlen_t start, R_xlen_t n) const = 0;
+
+  // ingest one array
+  Status IngestOne(SEXP data, const std::shared_ptr<arrow::Array>& array, 
R_xlen_t start,
+                   R_xlen_t n) const {
+    if (array->null_count() == n) {
+      return Ingest_all_nulls(data, start, n);
+    } else {
+      return Ingest_some_nulls(data, array, start, n);
+    }
+  }
+
+  // can this run in parallel ?
+  virtual bool Parallel() const { return true; }
+
+  // Ingest all the arrays serially
+  Status IngestSerial(SEXP data) {
+    R_xlen_t k = 0;
+    for (const auto& array : arrays_) {
+      auto n_chunk = array->length();
+      RETURN_NOT_OK(IngestOne(data, array, k, n_chunk));
+      k += n_chunk;
+    }
+    return Status::OK();
+  }
+
+  // ingest the arrays in parallel
+  //
+  // for each array, add a task to the task group
+  //
+  // The task group is Finish() iun the caller
+  void IngestParallel(SEXP data, const 
std::shared_ptr<arrow::internal::TaskGroup>& tg) {
+    R_xlen_t k = 0;
+    for (const auto& array : arrays_) {
+      auto n_chunk = array->length();
+      tg->Append([=] { return IngestOne(data, array, k, n_chunk); });
+      k += n_chunk;
+    }
+  }
+
+  // Converter factory
+  static std::shared_ptr<Converter> Make(const ArrayVector& arrays);
+
+ protected:
+  const ArrayVector& arrays_;
+};
+
+// data[start:(start+n)] = NA
+template <int RTYPE>
+Status AllNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n) {
+  auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start;
+  std::fill_n(p_data, n, default_value<RTYPE>());
+  return Status::OK();
+}
+
+// ingest the data from `array` into a slice of `data`
+//
+// each element goes through `lambda` when some conversion is needed
+template <int RTYPE, typename array_value_type, typename Lambda>
+Status SomeNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n,
+                       const array_value_type* p_values,
+                       const std::shared_ptr<arrow::Array>& array, Lambda 
lambda) {
+  if (!p_values) {
+    return Status::Invalid("Invalid data buffer");
+  }
+  auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start;
+
+  if (array->null_count()) {
+    arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(),
+                                                array->offset(), n);
+    for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data, ++p_values) 
{
+      *p_data = bitmap_reader.IsSet() ? lambda(*p_values) : 
default_value<RTYPE>();
+    }
+  } else {
+    std::transform(p_values, p_values + n, p_data, lambda);
+  }
+
+  return Status::OK();
+}
+
+// Allocate + Ingest
+SEXP ArrayVector__as_vector(R_xlen_t n, const ArrayVector& arrays) {
+  auto converter = Converter::Make(arrays);
+  Shield<SEXP> data(converter->Allocate(n));
+  STOP_IF_NOT_OK(converter->IngestSerial(data));
+  return data;
+}
+
+template <int RTYPE>
+class Converter_SimpleArray : public Converter {
+  using Vector = Rcpp::Vector<RTYPE, Rcpp::NoProtectStorage>;
+  using value_type = typename Vector::stored_type;
+
+ public:
+  Converter_SimpleArray(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const { return Vector(no_init(n)); }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<RTYPE>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto p_values = array->data()->GetValues<value_type>(1);
+    auto echo = [](value_type value) { return value; };
+    return SomeNull_Ingest<RTYPE, value_type>(data, start, n, p_values, array, 
echo);
+  }
+};
+
+class Converter_Date32 : public Converter_SimpleArray<INTSXP> {
+ public:
+  Converter_Date32(const ArrayVector& arrays) : 
Converter_SimpleArray<INTSXP>(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    IntegerVector data(no_init(n));
+    data.attr("class") = "Date";
+    return data;
+  }
+};
+
+struct Converter_String : public Converter {
+ public:
+  Converter_String(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const { return StringVector_(no_init(n)); }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<STRSXP>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto p_offset = array->data()->GetValues<int32_t>(1);
+    if (!p_offset) {
+      return Status::Invalid("Invalid offset buffer");
+    }
+    auto p_strings = array->data()->GetValues<char>(2, *p_offset);
+    if (!p_strings) {
+      // There is an offset buffer, but the data buffer is null
+      // There is at least one value in the array and not all the values are 
null
+      // That means all values are either empty strings or nulls so there is 
nothing to do
+
+      if (array->null_count()) {
+        arrow::internal::BitmapReader null_reader(array->null_bitmap_data(),
+                                                  array->offset(), n);
+        for (int i = 0; i < n; i++, null_reader.Next()) {
+          if (null_reader.IsNotSet()) {
+            SET_STRING_ELT(data, start + i, NA_STRING);
+          }
+        }
+      }
+      return Status::OK();
+    }
+
+    arrow::StringArray* string_array = 
static_cast<arrow::StringArray*>(array.get());
+    if (array->null_count()) {
+      // need to watch for nulls
+      arrow::internal::BitmapReader null_reader(array->null_bitmap_data(),
+                                                array->offset(), n);
+      for (int i = 0; i < n; i++, null_reader.Next()) {
+        if (null_reader.IsSet()) {
+          SET_STRING_ELT(data, start + i, 
r_string(string_array->GetString(i)));
+        } else {
+          SET_STRING_ELT(data, start + i, NA_STRING);
+        }
+      }
+
+    } else {
+      for (int i = 0; i < n; i++) {
+        SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i)));
+      }
+    }
+
+    return Status::OK();
+  }
+
+  bool Parallel() const { return false; }
+
+  inline SEXP r_string(const arrow::util::string_view& view) const {
+    return Rf_mkCharLenCE(view.data(), view.size(), CE_UTF8);
+  }
+};
+
+class Converter_Boolean : public Converter {
+ public:
+  Converter_Boolean(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const { return LogicalVector_(no_init(n)); }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<LGLSXP>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto p_data = Rcpp::internal::r_vector_start<LGLSXP>(data) + start;
+    auto p_bools = array->data()->GetValues<uint8_t>(1, 0);
+    if (!p_bools) {
+      return Status::Invalid("Invalid data buffer");
+    }
+
+    arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n);
+    if (array->null_count()) {
+      arrow::internal::BitmapReader null_reader(array->null_bitmap()->data(),
+                                                array->offset(), n);
+
+      for (size_t i = 0; i < n; i++, data_reader.Next(), null_reader.Next(), 
++p_data) {
+        *p_data = null_reader.IsSet() ? data_reader.IsSet() : NA_LOGICAL;
+      }
+    } else {
+      for (size_t i = 0; i < n; i++, data_reader.Next(), ++p_data) {
+        *p_data = data_reader.IsSet();
+      }
+    }
+
+    return Status::OK();
+  }
+};
+
+class Converter_Dictionary : public Converter {
+ public:
+  Converter_Dictionary(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    IntegerVector data(no_init(n));
+    auto dict_array = 
static_cast<DictionaryArray*>(Converter::arrays_[0].get());
+    auto dict = dict_array->dictionary();
+    auto indices = dict_array->indices();
+    switch (indices->type_id()) {
+      case Type::UINT8:
+      case Type::INT8:
+      case Type::UINT16:
+      case Type::INT16:
+      case Type::INT32:
+        break;
+      default:
+        stop("Cannot convert Dictionary Array of type `%s` to R",
+             dict_array->type()->ToString());
+    }
+
+    if (dict->type_id() != Type::STRING) {
+      stop("Cannot convert Dictionary Array of type `%s` to R",
+           dict_array->type()->ToString());
+    }
+    bool ordered = dict_array->dict_type()->ordered();
+
+    data.attr("levels") = ArrayVector__as_vector(dict->length(), {dict});
+    if (ordered) {
+      data.attr("class") = CharacterVector::create("ordered", "factor");
+    } else {
+      data.attr("class") = "factor";
+    }
+    return data;
+  }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<INTSXP>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    DictionaryArray* dict_array = static_cast<DictionaryArray*>(array.get());
+    auto indices = dict_array->indices();
+    switch (indices->type_id()) {
+      case Type::UINT8:
+        return Ingest_some_nulls_Impl<arrow::UInt8Type>(data, array, start, n);
+      case Type::INT8:
+        return Ingest_some_nulls_Impl<arrow::Int8Type>(data, array, start, n);
+      case Type::UINT16:
+        return Ingest_some_nulls_Impl<arrow::UInt16Type>(data, array, start, 
n);
+      case Type::INT16:
+        return Ingest_some_nulls_Impl<arrow::Int16Type>(data, array, start, n);
+      case Type::INT32:
+        return Ingest_some_nulls_Impl<arrow::Int32Type>(data, array, start, n);
+      default:
+        break;
+    }
+    return Status::OK();
+  }
+
+ private:
+  template <typename Type>
+  Status Ingest_some_nulls_Impl(SEXP data, const 
std::shared_ptr<arrow::Array>& array,
+                                R_xlen_t start, R_xlen_t n) const {
+    using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
+
+    std::shared_ptr<Array> indices =
+        static_cast<DictionaryArray*>(array.get())->indices();
+
+    // convert the 0-based indices from the arrow Array
+    // to 1-based indices used in R factors
+    auto to_r_index = [](value_type value) { return static_cast<int>(value) + 
1; };
+
+    return SomeNull_Ingest<INTSXP, value_type>(
+        data, start, n, indices->data()->GetValues<value_type>(1), indices, 
to_r_index);
+  }
+};
+
+double ms_to_seconds(int64_t ms) { return static_cast<double>(ms / 1000); }
+
+class Converter_Date64 : public Converter {
+ public:
+  Converter_Date64(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    NumericVector data(no_init(n));
+    data.attr("class") = CharacterVector::create("POSIXct", "POSIXt");
+    return data;
+  }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<REALSXP>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto convert = [](int64_t ms) { return static_cast<double>(ms / 1000); };
+    return SomeNull_Ingest<REALSXP, int64_t>(
+        data, start, n, array->data()->GetValues<int64_t>(1), array, convert);
+  }
+};
+
+template <int RTYPE, typename Type>
+class Converter_Promotion : public Converter {
+  using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
+  using value_type = typename TypeTraits<Type>::ArrayType::value_type;
+
+ public:
+  Converter_Promotion(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    return Rcpp::Vector<RTYPE, NoProtectStorage>(no_init(n));
+  }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<RTYPE>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto convert = [](value_type value) { return 
static_cast<r_stored_type>(value); };
+    return SomeNull_Ingest<RTYPE, value_type>(
+        data, start, n, array->data()->GetValues<value_type>(1), array, 
convert);
+  }
+
+ private:
+  static r_stored_type value_convert(value_type value) {
+    return static_cast<r_stored_type>(value);
+  }
+};
+
+template <typename value_type>
+class Converter_Time : public Converter {
+ public:
+  Converter_Time(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    NumericVector data(no_init(n));
+    data.attr("class") = CharacterVector::create("hms", "difftime");
+    data.attr("units") = CharacterVector::create("secs");
+    return data;
+  }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<REALSXP>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    int multiplier = TimeUnit_multiplier(array);
+    auto convert = [=](value_type value) {
+      return static_cast<double>(value) / multiplier;
+    };
+    return SomeNull_Ingest<REALSXP, value_type>(
+        data, start, n, array->data()->GetValues<value_type>(1), array, 
convert);
+  }
+
+ private:
+  int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const {
+    switch (static_cast<TimeType*>(array->type().get())->unit()) {
+      case TimeUnit::SECOND:
+        return 1;
+      case TimeUnit::MILLI:
+        return 1000;
+      case TimeUnit::MICRO:
+        return 1000000;
+      case TimeUnit::NANO:
+        return 1000000000;
+    }
+  }
+};
+
+template <typename value_type>
+class Converter_Timestamp : public Converter_Time<value_type> {
+ public:
+  Converter_Timestamp(const ArrayVector& arrays) : 
Converter_Time<value_type>(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    NumericVector data(no_init(n));
+    data.attr("class") = CharacterVector::create("POSIXct", "POSIXt");
+    return data;
+  }
+};
+
+class Converter_Decimal : public Converter {
+ public:
+  Converter_Decimal(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const { return NumericVector_(no_init(n)); }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    return AllNull_Ingest<REALSXP>(data, start, n);
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto p_data = Rcpp::internal::r_vector_start<REALSXP>(data) + start;
+    const auto& decimals_arr =
+        internal::checked_cast<const arrow::Decimal128Array&>(*array);
+
+    internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), 
array->offset(),
+                                         n);
+
+    for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) {
+      *p_data = bitmap_reader.IsSet() ? 
std::stod(decimals_arr.FormatValue(i).c_str())
+                                      : NA_REAL;
+    }
+
+    return Status::OK();
+  }
+};
+
+class Converter_Int64 : public Converter {
+ public:
+  Converter_Int64(const ArrayVector& arrays) : Converter(arrays) {}
+
+  SEXP Allocate(R_xlen_t n) const {
+    NumericVector data(no_init(n));
+    data.attr("class") = "integer64";
+    return data;
+  }
+
+  Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
+    auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start;
+    std::fill_n(p_data, n, NA_INT64);
+    return Status::OK();
+  }
+
+  Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& 
array,
+                           R_xlen_t start, R_xlen_t n) const {
+    auto p_values = array->data()->GetValues<int64_t>(1);
+    if (!p_values) {
+      return Status::Invalid("Invalid data buffer");
+    }
+
+    auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start;
+
+    if (array->null_count()) {
+      internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), 
array->offset(),
+                                           n);
+      for (size_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) {
+        *p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64;
+      }
+    } else {
+      std::copy_n(p_values, n, p_data);
+    }
+
+    return Status::OK();
+  }
+};
+
+std::shared_ptr<Converter> Converter::Make(const ArrayVector& arrays) {
+  using namespace arrow::r;
+
+  switch (arrays[0]->type_id()) {
+    // direct support
+    case Type::INT8:
+      return std::make_shared<Converter_SimpleArray<RAWSXP>>(arrays);
+
+    case Type::INT32:
+      return std::make_shared<Converter_SimpleArray<INTSXP>>(arrays);
+
+    case Type::DOUBLE:
+      return std::make_shared<Converter_SimpleArray<REALSXP>>(arrays);
+
+      // need to handle 1-bit case
+    case Type::BOOL:
+      return std::make_shared<Converter_Boolean>(arrays);
+
+      // handle memory dense strings
+    case Type::STRING:
+      return std::make_shared<Converter_String>(arrays);
+
+    case Type::DICTIONARY:
+      return std::make_shared<Converter_Dictionary>(arrays);
+
+    case Type::DATE32:
+      return std::make_shared<Converter_Date32>(arrays);
+
+    case Type::DATE64:
+      return std::make_shared<Converter_Date64>(arrays);
+
+      // promotions to integer vector
+    case Type::UINT8:
+      return std::make_shared<Converter_Promotion<INTSXP, 
arrow::UInt8Type>>(arrays);
+
+    case Type::INT16:
+      return std::make_shared<Converter_Promotion<INTSXP, 
arrow::Int16Type>>(arrays);
+
+    case Type::UINT16:
+      return std::make_shared<Converter_Promotion<INTSXP, 
arrow::UInt16Type>>(arrays);
+
+      // promotions to numeric vector
+    case Type::UINT32:
+      return std::make_shared<Converter_Promotion<REALSXP, 
arrow::UInt32Type>>(arrays);
+
+    case Type::HALF_FLOAT:
+      return std::make_shared<Converter_Promotion<REALSXP, 
arrow::HalfFloatType>>(arrays);
+
+    case Type::FLOAT:
+      return std::make_shared<Converter_Promotion<REALSXP, 
arrow::FloatType>>(arrays);
+
+      // time32 ane time64
+    case Type::TIME32:
+      return std::make_shared<Converter_Time<int32_t>>(arrays);
+
+    case Type::TIME64:
+      return std::make_shared<Converter_Time<int64_t>>(arrays);
+
+    case Type::TIMESTAMP:
+      return std::make_shared<Converter_Timestamp<int64_t>>(arrays);
+
+    case Type::INT64:
+      return std::make_shared<Converter_Int64>(arrays);
+
+    case Type::DECIMAL:
+      return std::make_shared<Converter_Decimal>(arrays);
+
+    default:
+      break;
+  }
+
+  stop(tfm::format("cannot handle Array of type %s", 
arrays[0]->type()->name()));
+  return nullptr;
+}
+
+List to_dataframe_serial(int64_t nr, int64_t nc, const CharacterVector& names,
+                         const std::vector<std::shared_ptr<Converter>>& 
converters) {
+  List tbl(nc);
+
+  for (int i = 0; i < nc; i++) {
+    SEXP column = tbl[i] = converters[i]->Allocate(nr);
+    STOP_IF_NOT_OK(converters[i]->IngestSerial(column));
+  }
+  tbl.attr("names") = names;
+  tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame");
+  tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
+  return tbl;
+}
+
+List to_dataframe_parallel(int64_t nr, int64_t nc, const CharacterVector& 
names,
+                           const std::vector<std::shared_ptr<Converter>>& 
converters) {
+  List tbl(nc);
+
+  // task group to ingest data in parallel
+  auto tg = 
arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool());
+
+  // allocate and start ingesting immediately the columns that
+  // can be ingested in parallel, i.e. when ingestion no longer
+  // need to happen on the main thread
+  for (int i = 0; i < nc; i++) {
+    // allocate data for column i
+    SEXP column = tbl[i] = converters[i]->Allocate(nr);
+
+    // add a task to ingest data of that column if that can be done in parallel
+    if (converters[i]->Parallel()) {
+      converters[i]->IngestParallel(column, tg);
+    }
+  }
+
+  arrow::Status status = arrow::Status::OK();
+
+  // ingest the columns that cannot be dealt with in parallel
+  for (int i = 0; i < nc; i++) {
+    if (!converters[i]->Parallel()) {
+      status &= converters[i]->IngestSerial(tbl[i]);
+    }
+  }
+
+  // wait for the ingestion to be finished
+  status &= tg->Finish();
+
+  STOP_IF_NOT_OK(status);
+
+  tbl.attr("names") = names;
+  tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame");
+  tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
+
+  return tbl;
+}
+
+}  // namespace r
+}  // namespace arrow
+
+// [[Rcpp::export]]
+SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) {
+  return arrow::r::ArrayVector__as_vector(array->length(), {array});
+}
+
+// [[Rcpp::export]]
+SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& 
chunked_array) {
+  return arrow::r::ArrayVector__as_vector(chunked_array->length(),
+                                          chunked_array->chunks());
+}
+
+// [[Rcpp::export]]
+List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& 
batch,
+                               bool use_threads) {
+  int64_t nc = batch->num_columns();
+  int64_t nr = batch->num_rows();
+  CharacterVector names(nc);
+  std::vector<ArrayVector> arrays(nc);
+  std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc);
+
+  for (int64_t i = 0; i < nc; i++) {
+    names[i] = batch->column_name(i);
+    arrays[i] = {batch->column(i)};
+    converters[i] = arrow::r::Converter::Make(arrays[i]);
+  }
+
+  if (use_threads) {
+    return arrow::r::to_dataframe_parallel(nr, nc, names, converters);
+  } else {
+    return arrow::r::to_dataframe_serial(nr, nc, names, converters);
+  }
+}
+
+// [[Rcpp::export]]
+List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, bool 
use_threads) {
+  int64_t nc = table->num_columns();
+  int64_t nr = table->num_rows();
+  CharacterVector names(nc);
+  std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc);
+
+  for (int64_t i = 0; i < nc; i++) {
+    converters[i] = 
arrow::r::Converter::Make(table->column(i)->data()->chunks());
+    names[i] = table->column(i)->name();
+  }
+
+  if (use_threads) {
+    return arrow::r::to_dataframe_parallel(nr, nc, names, converters);
+  } else {
+    return arrow::r::to_dataframe_serial(nr, nc, names, converters);
+  }
+}
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 6fef799..a657731 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -52,6 +52,8 @@ namespace r {
 struct symbols {
   static SEXP units;
   static SEXP xp;
+  static SEXP dot_Internal;
+  static SEXP inspect;
 };
 }  // namespace r
 }  // namespace arrow
@@ -148,6 +150,7 @@ inline SEXP wrap_dispatch(const T& x, 
Rcpp::traits::wrap_type_unique_ptr_tag) {
 }  // namespace Rcpp
 
 namespace Rcpp {
+using NumericVector_ = Rcpp::Vector<REALSXP, Rcpp::NoProtectStorage>;
 using IntegerVector_ = Rcpp::Vector<INTSXP, Rcpp::NoProtectStorage>;
 using LogicalVector_ = Rcpp::Vector<LGLSXP, Rcpp::NoProtectStorage>;
 using StringVector_ = Rcpp::Vector<STRSXP, Rcpp::NoProtectStorage>;
@@ -156,11 +159,11 @@ using RawVector_ = Rcpp::Vector<RAWSXP, 
Rcpp::NoProtectStorage>;
 using List_ = Rcpp::Vector<VECSXP, Rcpp::NoProtectStorage>;
 
 template <int RTYPE>
-inline typename Rcpp::Vector<RTYPE>::stored_type default_value() {
+inline constexpr typename Rcpp::Vector<RTYPE>::stored_type default_value() {
   return Rcpp::Vector<RTYPE>::get_na();
 }
 template <>
-inline Rbyte default_value<RAWSXP>() {
+inline constexpr Rbyte default_value<RAWSXP>() {
   return 0;
 }
 
@@ -174,6 +177,11 @@ std::shared_ptr<arrow::RecordBatch> 
RecordBatch__from_dataframe(Rcpp::DataFrame
 namespace arrow {
 namespace r {
 
+void inspect(SEXP obj);
+
+// the integer64 sentinel
+constexpr int64_t NA_INT64 = std::numeric_limits<int64_t>::min();
+
 template <int RTYPE, typename Vec = Rcpp::Vector<RTYPE>>
 class RBuffer : public MutableBuffer {
  public:
diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp
index b6bee7a..b776d2a 100644
--- a/r/src/recordbatch.cpp
+++ b/r/src/recordbatch.cpp
@@ -58,22 +58,6 @@ std::shared_ptr<arrow::Array> RecordBatch__column(
 }
 
 // [[Rcpp::export]]
-List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& 
batch) {
-  int nc = batch->num_columns();
-  int nr = batch->num_rows();
-  List tbl(nc);
-  CharacterVector names(nc);
-  for (int i = 0; i < nc; i++) {
-    tbl[i] = Array__as_vector(batch->column(i));
-    names[i] = batch->column_name(i);
-  }
-  tbl.attr("names") = names;
-  tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame");
-  tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
-  return tbl;
-}
-
-// [[Rcpp::export]]
 std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl) 
{
   CharacterVector names = tbl.names();
 
diff --git a/r/src/symbols.cpp b/r/src/symbols.cpp
index e60bcce..5b4e44e 100644
--- a/r/src/symbols.cpp
+++ b/r/src/symbols.cpp
@@ -21,5 +21,14 @@ namespace arrow {
 namespace r {
 SEXP symbols::units = Rf_install("units");
 SEXP symbols::xp = Rf_install(".:xp:.");
+SEXP symbols::dot_Internal = Rf_install(".Internal");
+SEXP symbols::inspect = Rf_install("inspect");
+
+void inspect(SEXP obj) {
+  Rcpp::Shield<SEXP> call_inspect(Rf_lang2(symbols::inspect, obj));
+  Rcpp::Shield<SEXP> call_internal(Rf_lang2(symbols::dot_Internal, 
call_inspect));
+  Rf_eval(call_internal, R_GlobalEnv);
+}
+
 }  // namespace r
 }  // namespace arrow
diff --git a/r/src/table.cpp b/r/src/table.cpp
index f4ebd04..fcf2a03 100644
--- a/r/src/table.cpp
+++ b/r/src/table.cpp
@@ -46,23 +46,6 @@ std::shared_ptr<arrow::Schema> Table__schema(const 
std::shared_ptr<arrow::Table>
 }
 
 // [[Rcpp::export]]
-List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table) {
-  int nc = table->num_columns();
-  int nr = table->num_rows();
-  List tbl(nc);
-  CharacterVector names(nc);
-  for (int i = 0; i < nc; i++) {
-    auto column = table->column(i);
-    tbl[i] = ChunkedArray__as_vector(column->data());
-    names[i] = column->name();
-  }
-  tbl.attr("names") = names;
-  tbl.attr("class") = CharacterVector::create("tbl_df", "tbl", "data.frame");
-  tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
-  return tbl;
-}
-
-// [[Rcpp::export]]
 std::shared_ptr<arrow::Column> Table__column(const 
std::shared_ptr<arrow::Table>& table,
                                              int i) {
   return table->column(i);
diff --git a/r/src/threadpool.cpp b/r/src/threadpool.cpp
new file mode 100644
index 0000000..1ce0451
--- /dev/null
+++ b/r/src/threadpool.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/util/parallel.h>
+#include "arrow_types.h"
+
+//' Get the capacity of the global thread pool
+//'
+//' @return the number of worker threads in the thread pool to which
+//' Arrow dispatches various CPU-bound tasks. This is an ideal number,
+//' not necessarily the exact number of threads at a given point in time.
+//'
+//' You can change this number using [SetCpuThreadPoolCapacity()].
+//'
+//' @export
+// [[Rcpp::export]]
+int GetCpuThreadPoolCapacity() { return arrow::GetCpuThreadPoolCapacity(); }
+
+//' Set the capacity of the global thread pool
+//'
+//' @param threads the number of worker threads int the thread pool to which
+//' Arrow dispatches various CPU-bound tasks.
+//'
+//' The current number is returned by [GetCpuThreadPoolCapacity()]
+//'
+//' @export
+// [[Rcpp::export]]
+void SetCpuThreadPoolCapacity(int threads) {
+  STOP_IF_NOT_OK(arrow::SetCpuThreadPoolCapacity(threads));
+}
diff --git a/r/tests/testthat/test-RecordBatch.R 
b/r/tests/testthat/test-RecordBatch.R
index f40bd83..29f9094 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -69,7 +69,6 @@ test_that("RecordBatch", {
   expect_equal(col_fct$as_vector(), tbl$fct)
   expect_equal(col_fct$type, dictionary(int32(), array(letters[1:10])))
 
-
   batch2 <- batch$RemoveColumn(0)
   expect_equal(
     batch2$schema,
diff --git a/r/R/parquet.R b/r/tests/testthat/test-cputhreadpoolcapacity.R
similarity index 63%
copy from r/R/parquet.R
copy to r/tests/testthat/test-cputhreadpoolcapacity.R
index 141da7b..de23f15 100644
--- a/r/R/parquet.R
+++ b/r/tests/testthat/test-cputhreadpoolcapacity.R
@@ -15,19 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-#' Read parquet file from disk
-#'
-#' @param file a file path
-#' @param as_tibble should the [arrow::Table][arrow__Table] be converted to a 
tibble.
-#' @param ... currently ignored
-#'
-#' @return a [arrow::Table][arrow__Table], or a data frame if `as_tibble` is 
`TRUE`.
-#'
-#' @export
-read_parquet <- function(file, as_tibble = TRUE, ...) {
-  tab <- shared_ptr(`arrow::Table`, read_parquet_file(f))
-  if (isTRUE(as_tibble)) {
-    tab <- as_tibble(tab)
-  }
-  tab
-}
+context("CpuThreadPoolCapacity")
+
+test_that("can set/get cpu thread pool capacity", {
+  old <- GetCpuThreadPoolCapacity()
+  SetCpuThreadPoolCapacity(19L)
+  expect_equal(GetCpuThreadPoolCapacity(), 19L)
+  SetCpuThreadPoolCapacity(old)
+  expect_equal(GetCpuThreadPoolCapacity(), old)
+})

Reply via email to