Re: [EXTERNAL] Re: Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread Shay Elbaz
I don't think there is a definitive right or wrong approach here. The SLS 
feature would not have been added to Spark if there was no real need for it, 
and AFAIK it required quite a bit of refactoring of Spark internals. So I'm 
sure this discussion was already made in the developers community  :)

In my specific case, I need it also for interactive dev/research sessions on 
Jupyter notebooks, and it makes more sense to switch resources than stopping 
the session and starting a new one (over and over again).

Shay

From: ayan guha 
Sent: Sunday, November 6, 2022 4:19 PM
To: Shay Elbaz 
Cc: Artemis User ; Tom Graves ; 
Tom Graves ; user@spark.apache.org 

Subject: [EXTERNAL] Re: Re: Re: Re: Re: Stage level scheduling - lower the 
number of executors when using GPUs


ATTENTION: This email originated from outside of GM.


May I ask why the ETL job and DL ( Assuming you mean deep learning here) task 
can not be run as 2 separate spark job?

IMHO it is better practice to split up entire pipeline into logical steps and 
orchestrate them.

That way you can pick your profile as you need for 2 very different type of 
workloads.

Ayan

On Sun, 6 Nov 2022 at 12:04 am, Shay Elbaz 
mailto:shay.el...@gm.com>> wrote:
Consider this:

  1.  The application is allowed to use only 20 GPUs.
  2.  To ensure exactly 20 GPUs, I use the 
df.rdd.repartition(20).withResources(gpus.build).mapPartitions(func) technique. 
(maxExecutors >> 20).
  3.  Given the volume of the input data, it takes 20 hours total to run the DL 
part (computer vision) on 20 GPUs, or 1 hour per GPU task.

Normally, I would repartition to 200 partitions to get a finer grained ~6 
minutes tasks instead of 1 hour. But here we're "forced" to use only 20 
partitions. To be clear, I'm only referring to potential failures/lags here. 
The job needs at least 20 hours total (on 20 GPUs) no matter what, but if any 
task fails after 50 minutes for example, we have to re-process these 50 minutes 
again. Or if a task/executor lags behind due to environment issues, then 
speculative execution will only trigger another task after 1 hour. These issues 
would be avoided if we used 200 partitions, but then Spark will try to allocate 
more than 20 GPUs.

I hope that was more clear.
Thank you very much for helping.

Shay


From: Tom Graves mailto:tgraves...@yahoo.com>>
Sent: Friday, November 4, 2022 4:19 PM
To: Tom Graves ; Artemis User 
mailto:arte...@dtechspace.com>>; 
user@spark.apache.org 
mailto:user@spark.apache.org>>; Shay Elbaz 
mailto:shay.el...@gm.com>>
Subject: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number 
of executors when using GPUs


ATTENTION: This email originated from outside of GM.


So I'm not sure I completely follow. Are you asking for a way to change the 
limit without having to do the repartition?  And your DL software doesn't care 
if you got say 30 executors instead of 20?  Normally I would expect the number 
fo partitions at that point to be 200 (or whatever you set for your shuffle 
partitions) unless you are using AQE coalescing partitions functionality and 
then it could change. Are you using the latter?

> Normally I try to aim for anything between 30s-5m per task (failure-wise), 
> depending on the cluster, its stability, etc. But in this case, individual 
> tasks can take 30-60 minutes, if not much more. Any failure during this long 
> time is pretty expensive.

Are you saying when you manually do the repartition your DL tasks take 30-60 
minutes?  so again you want like AQE coalesce partitions to kick in to attempt 
to pick partition sizes for your?


Tom

On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz 
mailto:shay.el...@gm.com>> wrote:


This is exactly what we ended up doing! The only drawback I saw with this 
approach is that the GPU tasks get pretty big (in terms of data and compute 
time), and task failures become expansive. That's why I reached out to the 
mailing list in the first place 
Normally I try to aim for anything between 30s-5m per task (failure-wise), 
depending on the cluster, its stability, etc. But in this case, individual 
tasks can take 30-60 minutes, if not much more. Any failure during this long 
time is pretty expensive.


Shay

From: Tom Graves 
Sent: Thursday, November 3, 2022 7:56 PM
To: Artemis User mailto:arte...@dtechspace.com>>; 
user@spark.apache.org 
mailto:user@spark.apache.org>>; Shay Elbaz 
mailto:shay.el...@gm.com>>
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.


Stage level scheduling does not allow you to change configs right now. This is 
something we thought about as follow on but have never implemented.  How many 
tasks on the DL stage are you running?  The typical case is run some etl lots 
of 

Re: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread ayan guha
May I ask why the ETL job and DL ( Assuming you mean deep learning here)
task can not be run as 2 separate spark job?

IMHO it is better practice to split up entire pipeline into logical steps
and orchestrate them.

That way you can pick your profile as you need for 2 very different type of
workloads.

Ayan

On Sun, 6 Nov 2022 at 12:04 am, Shay Elbaz  wrote:

> Consider this:
>
>1. The application is allowed to use only 20 GPUs.
>2. To ensure exactly 20 GPUs, I use the *df*.
>*rdd.repartition(20).withResources(gpus.build).mapPartitions(func)* 
> technique.
>(maxExecutors >> 20).
>3. Given the volume of the input data, it takes 20 hours *total* to
>run the DL part (computer vision) on 20 GPUs, or* 1 hour per GPU task*.
>
> Normally, I would repartition to 200 partitions to get a finer grained ~6
> minutes tasks instead of 1 hour. But here we're "forced" to use only 20
> partitions. To be clear, I'm only referring to potential failures/lags
> here. The job needs at least 20 hours total (on 20 GPUs) no matter what,
> but if any task fails after 50 minutes for example, we have to re-process
> these 50 minutes again. Or if a task/executor lags behind due to
> environment issues, then speculative execution will only trigger another
> task after 1 hour. These issues would be avoided if we used 200 partitions,
> but then Spark will try to allocate more than 20 GPUs.
>
> I hope that was more clear.
> Thank you very much for helping.
>
> Shay
>
> --
> *From:* Tom Graves 
> *Sent:* Friday, November 4, 2022 4:19 PM
> *To:* Tom Graves ; Artemis User <
> arte...@dtechspace.com>; user@spark.apache.org ;
> Shay Elbaz 
> *Subject:* [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the
> number of executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>
> So I'm not sure I completely follow. Are you asking for a way to change
> the limit without having to do the repartition?  And your DL software
> doesn't care if you got say 30 executors instead of 20?  Normally I would
> expect the number fo partitions at that point to be 200 (or whatever you
> set for your shuffle partitions) unless you are using AQE coalescing
> partitions functionality and then it could change. Are you using the latter?
>
> > Normally I try to aim for anything between 30s-5m per
> *task (failure-wise)*, depending on the cluster, its stability, etc. But
> in this case, individual tasks can take 30-60 minutes, if not much more.
> Any failure during this long time is pretty expensive.
>
> Are you saying when you manually do the repartition your DL tasks take
> 30-60 minutes?  so again you want like AQE coalesce partitions to kick in
> to attempt to pick partition sizes for your?
>
>
> Tom
>
> On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz <
> shay.el...@gm.com> wrote:
>
>
> This is exactly what we ended up doing! The only drawback I saw with this
> approach is that the GPU tasks get pretty big (in terms of data and compute
> time), and task failures become expansive. That's why I reached out to the
> mailing list in the first place 
> Normally I try to aim for anything between 30s-5m per
> *task (failure-wise)*, depending on the cluster, its stability, etc. But
> in this case, individual tasks can take 30-60 minutes, if not much more.
> Any failure during this long time is pretty expensive.
>
>
> Shay
> --
> *From:* Tom Graves 
> *Sent:* Thursday, November 3, 2022 7:56 PM
> *To:* Artemis User ; user@spark.apache.org <
> user@spark.apache.org>; Shay Elbaz 
> *Subject:* [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the
> number of executors when using GPUs
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>
> Stage level scheduling does not allow you to change configs right now.
> This is something we thought about as follow on but have never
> implemented.  How many tasks on the DL stage are you running?  The typical
> case is run some etl lots of tasks... do mapPartitions and then run your DL
> stuff, before that mapPartitions you could do a repartition if necessary to
> get to exactly the number of tasks you want (20).  That way even if
> maxExecutors=500 you will only ever need 20 or whatever you repartition to
> and spark isn't going to ask for more then that.
>
> Tom
>
> On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz <
> shay.el...@gm.com> wrote:
>
>
> Thanks again Artemis, I really appreciate it. I have watched the video
> but did not find an answer.
>
> Please bear with me just one more iteration 
>
> Maybe I'll be more specific:
> Suppose I start the application with maxExecutors=500, executors.cores=2,
> because that's the amount of resources needed for the ETL part. But for the
> DL part I only need 20 GPUs. SLS API only allows to set the resources per
> executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I
> configure the profile with 1 GPU per executor.

ClassCastException while reading parquet data via Hive metastore

2022-11-06 Thread Naresh Peshwe
Hi all,
I am trying to read data (using spark sql) via a hive metastore which has a
column of type bigint. Underlying parquet data has int as the datatype for
the same column. I am getting the following error while trying to read the
data using spark sql -

java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot
be cast to org.apache.hadoop.io.LongWritable
at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector.get(WritableLongObjectInspector.java:36)
at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$6.apply(TableReader.scala:418)
...

I believe it is related to
https://issues.apache.org/jira/browse/SPARK-17477. Any suggestions on
how I can work around this issue?

Spark version: 2.4.5

Regards,

Naresh