[ https://issues.apache.org/jira/browse/ARROW-9888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andy Grove resolved ARROW-9888. ------------------------------- Fix Version/s: 2.0.0 Resolution: Fixed Issue resolved by pull request 8082 [https://github.com/apache/arrow/pull/8082] > [Rust] [DataFusion] ExecutionContext can not be shared between threads > ---------------------------------------------------------------------- > > Key: ARROW-9888 > URL: https://issues.apache.org/jira/browse/ARROW-9888 > Project: Apache Arrow > Issue Type: Bug > Reporter: Andrew Lamb > Assignee: Andrew Lamb > Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > Time Spent: 40m > Remaining Estimate: 0h > > As suggested by Jorge on https://github.com/apache/arrow/pull/8079 > The high level idea is to allow ExecutionContext on multi-threaded > environments such as Python. > The two use-cases: > 1. when a project is planning a complex number of plans that depend on a > common set of sources and UDFs, it would be nice to be able to multi-thread > the planning. This is particularly important when planning requires reading > remote metadata to formulate themselves (e.g. when the source is in s3 with > many partitions). Metadata reading is often slow and network bounded, which > makes threads suitable for these workloads. If multi-threading is not > possible, either each plan needs to read the metadata independently (one > context per plan) or planning must be sequential (with lots of network > waiting). > 2. when creating bindings to programming languages that support > multi-threading, it would be nice for the ExecutionContext to be thread safe, > so that we can more easily integrate with those languages. > The code might look like: > {code} > alamb@MacBook-Pro rust % git diff > diff --git a/rust/datafusion/src/execution/context.rs > b/rust/datafusion/src/execution/context.rs > index 5f8aa342e..7374b0a78 100644 > --- a/rust/datafusion/src/execution/context.rs > +++ b/rust/datafusion/src/execution/context.rs > @@ -460,7 +460,7 @@ mod tests { > use arrow::array::{ArrayRef, Int32Array}; > use arrow::compute::add; > use std::fs::File; > - use std::io::prelude::*; > + use std::{sync::Mutex, io::prelude::*}; > use tempdir::TempDir; > use test::*; > > @@ -928,6 +928,28 @@ mod tests { > Ok(()) > } > > + #[test] > + fn send_context_to_threads() -> Result<()> { > + // ensure that ExecutionContext's can be read by multiple threads > concurrently > + let tmp_dir = TempDir::new("send_context_to_threads")?; > + let partition_count = 4; > + let mut ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, > partition_count)?)); > + > + let threads: Vec<JoinHandle<Result<_>>> = (0..2) > + .map(|_| { ctx.clone() }) > + .map(|ctx_clone| thread::spawn(move || { > + let ctx = ctx_clone.lock().expect("Locked context"); > + // Ensure we can create logical plan code on a separate > thread. > + ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > > 0 AND c1 < 3") > + })) > + .collect(); > + > + for thread in threads { > + thread.join().expect("Failed to join thread")?; > + } > + Ok(()) > + } > + > #[test] > fn scalar_udf() -> Result<()> { > let schema = Schema::new(vec![ > {code} > At the moment, Rust refuses to compile this example (and also refuses to > share ExecutionContexts between threads) due to the following (namely that > there are several `dyn` objects that are also not marked as Send + Sync: > {code} > Compiling datafusion v2.0.0-SNAPSHOT > (/Users/alamb/Software/arrow/rust/datafusion) > error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` > cannot be sent between threads safely > --> datafusion/src/execution/context.rs:940:30 > | > 940 | .map(|ctx_clone| thread::spawn(move || { > | ^^^^^^^^^^^^^ `(dyn > execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between > threads safely > | > ::: > /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 > | > 616 | F: Send + 'static, > | ---- required by this bound in `std::thread::spawn` > | > = help: the trait `std::marker::Send` is not implemented for `(dyn > execution::physical_plan::PhysicalPlanner + 'static)` > = note: required because of the requirements on the impl of > `std::marker::Send` for `std::sync::Arc<(dyn > execution::physical_plan::PhysicalPlanner + 'static)>` > = note: required because it appears within the type > `std::option::Option<std::sync::Arc<(dyn > execution::physical_plan::PhysicalPlanner + 'static)>>` > = note: required because it appears within the type > `execution::context::ExecutionConfig` > = note: required because it appears within the type > `execution::context::ExecutionContextState` > = note: required because it appears within the type > `execution::context::ExecutionContext` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Mutex<execution::context::ExecutionContext>` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` > = note: required because it appears within the type > `[closure@datafusion/src/execution/context.rs:940:44: 944:14 > ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` > error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` > cannot be shared between threads safely > --> datafusion/src/execution/context.rs:940:30 > | > 940 | .map(|ctx_clone| thread::spawn(move || { > | ^^^^^^^^^^^^^ `(dyn > execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared > between threads safely > | > ::: > /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 > | > 616 | F: Send + 'static, > | ---- required by this bound in `std::thread::spawn` > | > = help: the trait `std::marker::Sync` is not implemented for `(dyn > execution::physical_plan::PhysicalPlanner + 'static)` > = note: required because of the requirements on the impl of > `std::marker::Send` for `std::sync::Arc<(dyn > execution::physical_plan::PhysicalPlanner + 'static)>` > = note: required because it appears within the type > `std::option::Option<std::sync::Arc<(dyn > execution::physical_plan::PhysicalPlanner + 'static)>>` > = note: required because it appears within the type > `execution::context::ExecutionConfig` > = note: required because it appears within the type > `execution::context::ExecutionContextState` > = note: required because it appears within the type > `execution::context::ExecutionContext` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Mutex<execution::context::ExecutionContext>` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` > = note: required because it appears within the type > `[closure@datafusion/src/execution/context.rs:940:44: 944:14 > ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` > error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot > be sent between threads safely > --> datafusion/src/execution/context.rs:940:30 > | > 940 | .map(|ctx_clone| thread::spawn(move || { > | ^^^^^^^^^^^^^ `(dyn > datasource::datasource::TableProvider + 'static)` cannot be sent between > threads safely > | > ::: > /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 > | > 616 | F: Send + 'static, > | ---- required by this bound in `std::thread::spawn` > | > = help: the trait `std::marker::Send` is not implemented for `(dyn > datasource::datasource::TableProvider + 'static)` > = note: required because of the requirements on the impl of > `std::marker::Send` for `std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>` > = note: required because it appears within the type > `(std::string::String, std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>)` > = note: required because of the requirements on the impl of > `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, > std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>` > = note: required because it appears within the type > `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>, > std::collections::hash_map::RandomState>` > = note: required because it appears within the type > `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>>` > = note: required because it appears within the type > `execution::context::ExecutionContextState` > = note: required because it appears within the type > `execution::context::ExecutionContext` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Mutex<execution::context::ExecutionContext>` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` > = note: required because it appears within the type > `[closure@datafusion/src/execution/context.rs:940:44: 944:14 > ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` > error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot > be shared between threads safely > --> datafusion/src/execution/context.rs:940:30 > | > 940 | .map(|ctx_clone| thread::spawn(move || { > | ^^^^^^^^^^^^^ `(dyn > datasource::datasource::TableProvider + 'static)` cannot be shared between > threads safely > | > ::: > /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 > | > 616 | F: Send + 'static, > | ---- required by this bound in `std::thread::spawn` > | > = help: the trait `std::marker::Sync` is not implemented for `(dyn > datasource::datasource::TableProvider + 'static)` > = note: required because of the requirements on the impl of > `std::marker::Send` for `std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>` > = note: required because it appears within the type > `(std::string::String, std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>)` > = note: required because of the requirements on the impl of > `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, > std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>` > = note: required because it appears within the type > `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>, > std::collections::hash_map::RandomState>` > = note: required because it appears within the type > `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn > datasource::datasource::TableProvider + 'static)>>` > = note: required because it appears within the type > `execution::context::ExecutionContextState` > = note: required because it appears within the type > `execution::context::ExecutionContext` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Mutex<execution::context::ExecutionContext>` > = note: required because of the requirements on the impl of > `std::marker::Send` for > `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` > = note: required because it appears within the type > `[closure@datafusion/src/execution/context.rs:940:44: 944:14 > ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` > Compiling arrow-benchmarks v2.0.0-SNAPSHOT > (/Users/alamb/Software/arrow/rust/benchmarks) > error: aborting due to 4 previous errors > For more information about this error, try `rustc --explain E0277`. > error: could not compile `datafusion`. > To learn more, run the command again with --verbose. > warning: build failed, waiting for other jobs to finish... > error: build failed > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)