What you could do is launch N Spark jobs in parallel from the driver. Each
one would process a directory you supply with spark.read.parquet, for
example. You would just have 10s or 100s of those jobs running at the same
time.  You have to write a bit of async code to do it, but it's pretty easy
with Scala Futures.

On Tue, May 25, 2021 at 3:31 PM Eric Beabes <mailinglist...@gmail.com>
wrote:

> Here's the use case:
>
> We've a bunch of directories (over 1000) which contain tons of small files
> in each. Each directory is for a different customer so they are independent
> in that respect. We need to merge all the small files in each directory
> into one (or a few) compacted file(s) by using a 'coalesce' function.
>
> Clearly we can do this on the Driver by doing something like:
>
> list.par.foreach (dir =>compact(spark, dir))
>
> This works but the problem here is that the parallelism happens on Driver
> which won't scale when we've 10,000 customers! At any given time there will
> be only as many compactions happening as the number of cores on the Driver,
> right?
>
> We were hoping to do this:
>
> val df = list.toDF()
> df.foreach(dir => compact(spark,dir))
>
> Our hope was, this will distribute the load amongst Spark Executors & will
> scale better.  But this throws the NullPointerException shown in the
> original email.
>
> Is there a better way to do this?
>
>
> On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Why not just read from Spark as normal? Do these files have different or
>> incompatible schemas?
>>
>>
>>
>> val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)
>>
>>
>>
>> *From: *Eric Beabes <mailinglist...@gmail.com>
>> *Date: *Tuesday, May 25, 2021 at 1:24 PM
>> *To: *spark-user <user@spark.apache.org>
>> *Subject: *Reading parquet files in parallel on the cluster
>>
>>
>>
>> I've a use case in which I need to read Parquet files in parallel from
>> over 1000+ directories. I am doing something like this:
>>
>>
>>
>>    val df = list.toList.toDF()
>>
>>     df.foreach(c => {
>>       val config = *getConfigs()*
>> *      doSomething*(spark, config)
>>     })
>>
>>
>>
>> In the doSomething method, when I try to do this:
>>
>> val df1 = spark.read.parquet(pathToRead).collect()
>>
>>
>>
>> I get a NullPointer exception given below. It seems the 'spark.read' only 
>> works on the Driver not on the cluster. How can I do what I want to do? 
>> Please let me know. Thank you.
>>
>>
>>
>> 21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9,
>> ip-10-0-5-3.us-west-2.compute.internal, executor 11):
>> java.lang.NullPointerException
>>
>>
>>
>>         at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)
>>
>>
>>
>>         at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)
>>
>>
>>
>>         at
>> org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)
>>
>>
>>
>>         at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)
>>
>>
>>
>>

Reply via email to