[ 
https://issues.apache.org/jira/browse/ARROW-9888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove resolved ARROW-9888.
-------------------------------
    Fix Version/s: 2.0.0
       Resolution: Fixed

Issue resolved by pull request 8082
[https://github.com/apache/arrow/pull/8082]

> [Rust] [DataFusion] ExecutionContext can not be shared between threads
> ----------------------------------------------------------------------
>
>                 Key: ARROW-9888
>                 URL: https://issues.apache.org/jira/browse/ARROW-9888
>             Project: Apache Arrow
>          Issue Type: Bug
>            Reporter: Andrew Lamb
>            Assignee: Andrew Lamb
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 2.0.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> As suggested by Jorge on  https://github.com/apache/arrow/pull/8079
> The high level idea is to allow ExecutionContext on multi-threaded 
> environments such as Python.
> The two use-cases:
> 1. when a project is planning a complex number of plans that depend on a 
> common set of sources and UDFs, it would be nice to be able to multi-thread 
> the planning. This is particularly important when planning requires reading 
> remote metadata to formulate themselves (e.g. when the source is in s3 with 
> many partitions). Metadata reading is often slow and network bounded, which 
> makes threads suitable for these workloads. If multi-threading is not 
> possible, either each plan needs to read the metadata independently (one 
> context per plan) or planning must be sequential (with lots of network 
> waiting).
> 2. when creating bindings to programming languages that support 
> multi-threading, it would be nice for the ExecutionContext to be thread safe, 
> so that we can more easily integrate with those languages.
> The code might look like:
> {code}
> alamb@MacBook-Pro rust % git diff
> diff --git a/rust/datafusion/src/execution/context.rs 
> b/rust/datafusion/src/execution/context.rs
> index 5f8aa342e..7374b0a78 100644
> --- a/rust/datafusion/src/execution/context.rs
> +++ b/rust/datafusion/src/execution/context.rs
> @@ -460,7 +460,7 @@ mod tests {
>      use arrow::array::{ArrayRef, Int32Array};
>      use arrow::compute::add;
>      use std::fs::File;
> -    use std::io::prelude::*;
> +    use std::{sync::Mutex, io::prelude::*};
>      use tempdir::TempDir;
>      use test::*;
>  
> @@ -928,6 +928,28 @@ mod tests {
>          Ok(())
>      }
>  
> +    #[test]
> +    fn send_context_to_threads() -> Result<()> {
> +        // ensure that ExecutionContext's can be read by multiple threads 
> concurrently
> +        let tmp_dir = TempDir::new("send_context_to_threads")?;
> +        let partition_count = 4;
> +        let mut ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, 
> partition_count)?));
> +
> +        let threads: Vec<JoinHandle<Result<_>>> = (0..2)
> +            .map(|_| { ctx.clone() })
> +            .map(|ctx_clone| thread::spawn(move || {
> +                let ctx = ctx_clone.lock().expect("Locked context");
> +                // Ensure we can create logical plan code on a separate 
> thread.
> +                ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 
> 0 AND c1 < 3")
> +            }))
> +            .collect();
> +
> +        for thread in threads {
> +            thread.join().expect("Failed to join thread")?;
> +        }
> +        Ok(())
> +    }
> +
>      #[test]
>      fn scalar_udf() -> Result<()> {
>          let schema = Schema::new(vec![
> {code}
> At the moment, Rust refuses to compile this example (and also refuses to 
> share ExecutionContexts between threads) due to the following (namely that 
> there are several `dyn` objects that are also not marked as Send + Sync:
> {code}
>    Compiling datafusion v2.0.0-SNAPSHOT 
> (/Users/alamb/Software/arrow/rust/datafusion)
> error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` 
> cannot be sent between threads safely
>    --> datafusion/src/execution/context.rs:940:30
>     |
> 940 |             .map(|ctx_clone| thread::spawn(move || {
>     |                              ^^^^^^^^^^^^^ `(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between 
> threads safely
>     | 
>    ::: 
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
>     |
> 616 |     F: Send + 'static,
>     |        ---- required by this bound in `std::thread::spawn`
>     |
>     = help: the trait `std::marker::Send` is not implemented for `(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for `std::sync::Arc<(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)>`
>     = note: required because it appears within the type 
> `std::option::Option<std::sync::Arc<(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)>>`
>     = note: required because it appears within the type 
> `execution::context::ExecutionConfig`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContextState`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContext`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Mutex<execution::context::ExecutionContext>`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
>     = note: required because it appears within the type 
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14 
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` 
> cannot be shared between threads safely
>    --> datafusion/src/execution/context.rs:940:30
>     |
> 940 |             .map(|ctx_clone| thread::spawn(move || {
>     |                              ^^^^^^^^^^^^^ `(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared 
> between threads safely
>     | 
>    ::: 
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
>     |
> 616 |     F: Send + 'static,
>     |        ---- required by this bound in `std::thread::spawn`
>     |
>     = help: the trait `std::marker::Sync` is not implemented for `(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for `std::sync::Arc<(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)>`
>     = note: required because it appears within the type 
> `std::option::Option<std::sync::Arc<(dyn 
> execution::physical_plan::PhysicalPlanner + 'static)>>`
>     = note: required because it appears within the type 
> `execution::context::ExecutionConfig`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContextState`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContext`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Mutex<execution::context::ExecutionContext>`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
>     = note: required because it appears within the type 
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14 
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot 
> be sent between threads safely
>    --> datafusion/src/execution/context.rs:940:30
>     |
> 940 |             .map(|ctx_clone| thread::spawn(move || {
>     |                              ^^^^^^^^^^^^^ `(dyn 
> datasource::datasource::TableProvider + 'static)` cannot be sent between 
> threads safely
>     | 
>    ::: 
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
>     |
> 616 |     F: Send + 'static,
>     |        ---- required by this bound in `std::thread::spawn`
>     |
>     = help: the trait `std::marker::Send` is not implemented for `(dyn 
> datasource::datasource::TableProvider + 'static)`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for `std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>`
>     = note: required because it appears within the type 
> `(std::string::String, std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>)`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, 
> std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>`
>     = note: required because it appears within the type 
> `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>, 
> std::collections::hash_map::RandomState>`
>     = note: required because it appears within the type 
> `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>>`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContextState`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContext`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Mutex<execution::context::ExecutionContext>`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
>     = note: required because it appears within the type 
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14 
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot 
> be shared between threads safely
>    --> datafusion/src/execution/context.rs:940:30
>     |
> 940 |             .map(|ctx_clone| thread::spawn(move || {
>     |                              ^^^^^^^^^^^^^ `(dyn 
> datasource::datasource::TableProvider + 'static)` cannot be shared between 
> threads safely
>     | 
>    ::: 
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
>     |
> 616 |     F: Send + 'static,
>     |        ---- required by this bound in `std::thread::spawn`
>     |
>     = help: the trait `std::marker::Sync` is not implemented for `(dyn 
> datasource::datasource::TableProvider + 'static)`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for `std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>`
>     = note: required because it appears within the type 
> `(std::string::String, std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>)`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, 
> std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>`
>     = note: required because it appears within the type 
> `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>, 
> std::collections::hash_map::RandomState>`
>     = note: required because it appears within the type 
> `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn 
> datasource::datasource::TableProvider + 'static)>>`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContextState`
>     = note: required because it appears within the type 
> `execution::context::ExecutionContext`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Mutex<execution::context::ExecutionContext>`
>     = note: required because of the requirements on the impl of 
> `std::marker::Send` for 
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
>     = note: required because it appears within the type 
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14 
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
>    Compiling arrow-benchmarks v2.0.0-SNAPSHOT 
> (/Users/alamb/Software/arrow/rust/benchmarks)
> error: aborting due to 4 previous errors
> For more information about this error, try `rustc --explain E0277`.
> error: could not compile `datafusion`.
> To learn more, run the command again with --verbose.
> warning: build failed, waiting for other jobs to finish...
> error: build failed
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to