We are working with use cases where we need to do batch processing on a large number (hundreds of thousands) of Parquet files. The processing is quite similar per file. There are a many aggregates that are very SQL-friendly (computing averages, maxima, minima, aggregations on single columns with some selection criteria). There are also some processing that is more advanced time-series processing (continuous wavelet transforms and the like). This all seems like a good use case for Spark.
But I'm having performance problems. Let's take a look at something very simple, which simply checks whether the parquet files are readable. Code that seems natural but doesn't work: import scala.util.{Try, Success, Failure} val parquetFiles = sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1) My understanding is that this doesn't work because sqlContext can't be used inside of a transformation like "map" (or inside an action). That it only makes sense in the driver. Thus, it becomes a null reference in the above code, so all reads fail. Code that works: import scala.util.{Try, Success, Failure} val parquetFiles = sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x => (x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1) This works because the collect() means that everything happens back on the driver. So the sqlContext object makes sense and everything works fine. But it is slow. I'm using yarn-client mode on a 6-node cluster with 17 executors, 40 GB ram on driver, 19GB on executors. And it takes about 1 minute to execute for 100 parquet files. Which is too long. Recall we need to do this across hundreds of thousands of files. I realize it is possible to parallelize after the read: import scala.util.{Try, Success, Failure} val parquetFiles = sc.textFile("file_list.txt") val intermediate_successes = parquetFiles.collect().map(x => (x, Try(sqlContext.read.parquet(x)))) val dist_successes = sc.parallelize(successes) val successes = dist_successes.filter(_._2.isSuccess).map(x => x._1) But this does not improve performance substantially. It seems the bottleneck is that the reads are happening sequentially. Is there a better way to do this? Thanks, Jordan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org