What you could do is launch N Spark jobs in parallel from the driver. Each one would process a directory you supply with spark.read.parquet, for example. You would just have 10s or 100s of those jobs running at the same time. You have to write a bit of async code to do it, but it's pretty easy with Scala Futures.
On Tue, May 25, 2021 at 3:31 PM Eric Beabes <mailinglist...@gmail.com> wrote: > Here's the use case: > > We've a bunch of directories (over 1000) which contain tons of small files > in each. Each directory is for a different customer so they are independent > in that respect. We need to merge all the small files in each directory > into one (or a few) compacted file(s) by using a 'coalesce' function. > > Clearly we can do this on the Driver by doing something like: > > list.par.foreach (dir =>compact(spark, dir)) > > This works but the problem here is that the parallelism happens on Driver > which won't scale when we've 10,000 customers! At any given time there will > be only as many compactions happening as the number of cores on the Driver, > right? > > We were hoping to do this: > > val df = list.toDF() > df.foreach(dir => compact(spark,dir)) > > Our hope was, this will distribute the load amongst Spark Executors & will > scale better. But this throws the NullPointerException shown in the > original email. > > Is there a better way to do this? > > > On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > >> Why not just read from Spark as normal? Do these files have different or >> incompatible schemas? >> >> >> >> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths) >> >> >> >> *From: *Eric Beabes <mailinglist...@gmail.com> >> *Date: *Tuesday, May 25, 2021 at 1:24 PM >> *To: *spark-user <user@spark.apache.org> >> *Subject: *Reading parquet files in parallel on the cluster >> >> >> >> I've a use case in which I need to read Parquet files in parallel from >> over 1000+ directories. I am doing something like this: >> >> >> >> val df = list.toList.toDF() >> >> df.foreach(c => { >> val config = *getConfigs()* >> * doSomething*(spark, config) >> }) >> >> >> >> In the doSomething method, when I try to do this: >> >> val df1 = spark.read.parquet(pathToRead).collect() >> >> >> >> I get a NullPointer exception given below. It seems the 'spark.read' only >> works on the Driver not on the cluster. How can I do what I want to do? >> Please let me know. Thank you. >> >> >> >> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, >> ip-10-0-5-3.us-west-2.compute.internal, executor 11): >> java.lang.NullPointerException >> >> >> >> at >> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144) >> >> >> >> at >> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142) >> >> >> >> at >> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789) >> >> >> >> at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656) >> >> >> >>