We are working with use cases where we need to do batch processing on a large
number (hundreds of thousands) of Parquet files.  The processing is quite
similar per file.  There are a many aggregates that are very SQL-friendly
(computing averages, maxima, minima, aggregations on single columns with
some selection criteria).  There are also some processing that is more
advanced time-series processing (continuous wavelet transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at something very
simple, which simply checks whether the parquet files are readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)

My understanding is that this doesn't work because sqlContext can't be used
inside of a transformation like "map" (or inside an action).  That it only
makes sense in the driver.  Thus, it becomes a null reference in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x =>
(x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)


This works because the collect() means that everything happens back on the
driver.  So the sqlContext object makes sense and everything works fine.

But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
minute to execute for 100 parquet files.  Which is too long.  Recall we need
to do this across hundreds of thousands of files.

I realize it is possible to parallelize after the read:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val intermediate_successes =
parquetFiles.collect().map(x => (x,
Try(sqlContext.read.parquet(x))))
val dist_successes = sc.parallelize(successes) val successes =
dist_successes.filter(_._2.isSuccess).map(x => x._1)


But this does not improve performance substantially.  It seems the
bottleneck is that the reads are happening sequentially.

Is there a better way to do this?

Thanks,
Jordan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to