Hi Matthew,
Using the latest DataFusion from GitHub master branch, the following code
works for in-memory:
use std::sync::Arc;
use std::time::Instant;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
#[tokio::main]
async fn main() -> Result<()> {
//TODO add command-line args
let ratings_csv = "/tmp/movies/ratings_small.csv";
let mut ctx = ExecutionContext::new();
let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
let batches = vec![df.collect().await?];
let provider =
MemTable::new(Arc::new(df.schema().to_owned().into()), batches)?;
ctx.register_table("memory_table", Box::new(provider));
let mem_df = ctx.table("memory_table")?;
let q_start = Instant::now();
let _results = mem_df
.filter(col("userId").eq(lit(1)))?
.collect()
.await
.unwrap();
println!("Duration: {:?}", q_start.elapsed());
Ok(())
}
Andy.
On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <[email protected]>
wrote:
> Played around some more - it was because I wasn’t using --release flag.
> Sry about that, still learning rust.
>
> Using that flag, the total time to read and filter is between 52 and 80ms.
>
> In general, what should I expect when comparing the performance of pandas
> to datafusion?
>
> @Andy Grove thanks for adding that. If there is a need for additional
> datafusion benchmarks and what I do could help with that then I would be
> happy to contribute it. I will send a follow up once ive made progress.
>
> I'm also still having trouble with that memory table, so any help there is
> appreciated.
>
> Thanks for your time! Very excited by this.
>
> Matthew M. Turner
> Email: [email protected]
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Matthew Turner <[email protected]>
> Sent: Friday, December 11, 2020 12:24 AM
> To: [email protected]
> Subject: RE: [Rust] DataFusion performance
>
> Thanks for context! Makes sense.
>
> Even with that, when comparing the total time of each (read + filter)
> DataFusion still appears much slower(~625ms vs 33ms). Is that expected?
>
> Also, im trying to bring the table in memory now to perform the
> computation from there and compare performance. Code below. But I'm
> getting an error (beneath the code) even though I think ive constructed the
> MemTable correctly (from [1]). From what I see all the types are the same
> as when I used the original df from read_csv so I'm not sure what I'm doing
> wrong.
>
> I also saw there was an open issue [2] for this error type raised on
> rust-lang - so im unsure if its my implementation, datafusion/arrow issue,
> or Rust issue.
>
> Thanks again for help!
>
> ```
> let sch = Arc::new(df.schema());
> let batches = vec![df.collect().await?];
> let provider = MemTable::new(sch, batches)?;
>
> ctx.register_table("memory_table", Box::new(provider));
>
> let mem_df = ctx.table("memory_table")?;
>
> let q_start = Instant::now();
> let results = mem_df
> .filter(col("userId").eq(lit(1)))?
> .collect()
> .await
> .unwrap();
> ```
>
> Which is returning this error:
>
> error[E0698]: type inside `async` block must be known in this context
> --> src\main.rs:37:38
> |
> 37 | .filter(col("userId").eq(lit(1)))?
> | ^ cannot infer type for type
> `{integer}`
> |
> note: the type is part of the `async` block because of this `await`
> --> src\main.rs:36:19
> |
> 36 | let results = mem_df
> | ___________________^
> 37 | | .filter(col("userId").eq(lit(1)))?
> 38 | | .collect()
> 39 | | .await
> | |______________^
>
>
> [1]
> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
> [2] https://github.com/rust-lang/rust/issues/63502
>
> Matthew M. Turner
> Email: [email protected]
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Michael Mior <[email protected]>
> Sent: Thursday, December 10, 2020 8:55 PM
> To: [email protected]
> Subject: Re: [Rust] DataFusion performance
>
> Contrary to what you might expect given the name, read_csv does not
> actually read the CSV file. It instead creates the start of a logical
> execution plan which involves reading the CSV file when that plan is
> finally executed. This happens when you call collect().
>
> Pandas read_csv on the other hand immediately reads the CSV file. So
> you're comparing the time of reading AND filtering the file
> (DataFusion) with the time to filter data which has already been read
> (Pandas).
>
> There's nothing wrong with your use of DataFusion per se, you simply
> weren't measuring what you thought you were measuring.
> --
> Michael Mior
> [email protected]
>
> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <[email protected]>
> a écrit :
> >
> > Hello,
> >
> >
> >
> > I’ve been playing around with DataFusion to explore the feasibility of
> replacing current python/pandas data processing jobs with Rust/datafusion.
> Ultimately, looking to improve performance / decrease cost.
> >
> >
> >
> > I was doing some simple tests to start to measure performance
> differences on a simple task (read a csv[1] and filter it).
> >
> >
> >
> > Reading the csv datafusion seemed to outperform pandas by around 30%
> which was nice.
> >
> > *Rust took around 20-25ms to read the csv (compared to 32ms from
> > pandas)
> >
> >
> >
> > However, when filtering the data I was surprised to see that pandas was
> way faster.
> >
> > *Rust took around 500-600ms to filter the csv(compared to 1ms from
> > pandas)
> >
> >
> >
> > My code for each is below. I know I should be running the DataFusion
> times through something similar to pythons %timeit but I didn’t have that
> immediately accessible and I ran many times to confirm it was roughly
> consistent.
> >
> >
> >
> > Is this performance expected? Or am I using datafusion incorrectly?
> >
> >
> >
> > Any insight is much appreciated!
> >
> >
> >
> > [Rust]
> >
> > ```
> >
> > use datafusion::error::Result;
> >
> > use datafusion::prelude::*;
> >
> > use std::time::Instant;
> >
> >
> >
> > #[tokio::main]
> >
> > async fn main() -> Result<()> {
> >
> > let start = Instant::now();
> >
> >
> >
> > let mut ctx = ExecutionContext::new();
> >
> >
> >
> > let ratings_csv = "ratings_small.csv";
> >
> >
> >
> > let df = ctx.read_csv(ratings_csv,
> > CsvReadOptions::new()).unwrap();
> >
> > println!("Read CSV Duration: {:?}", start.elapsed());
> >
> >
> >
> > let q_start = Instant::now();
> >
> > let results = df
> >
> > .filter(col("userId").eq(lit(1)))?
> >
> > .collect()
> >
> > .await
> >
> > .unwrap();
> >
> > println!("Filter duration: {:?}", q_start.elapsed());
> >
> >
> >
> > println!("Duration: {:?}", start.elapsed());
> >
> >
> >
> > Ok(())
> >
> > }
> >
> > ```
> >
> >
> >
> > [Python]
> >
> > ```
> >
> > In [1]: df = pd.read_csv(“ratings_small.csv”)
> >
> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
> >
> >
> >
> > In [2]: df.query(“userId==1”)
> >
> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> > each)
> >
> > ```
> >
> >
> >
> > [1]:
> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> > sv
> >
> >
> >
> >
> >
> > Matthew M. Turner
> >
> > Email: [email protected]
> >
> > Phone: (908)-868-2786
> >
> >
>