Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Shay Elbaz
Seen this before; had a very(!) complex plan behind a DataFrame, to the point 
where any additional transformation went OOM on the driver.

A quick and ugly solution was to break the plan - convert the DataFrame to rdd 
and back to DF at certain points to make the plan shorter. This has obvious 
drawbacks, and is not recommended in general, but at least we had something 
working. The real, long-term solution was to replace the many ( > 200)  
withColumn() calls to only a few select() calls. You can easily find sources on 
the internet for why this is better. (it was on Spark 2.3, but I think the main 
principles remain). TBH, it was easier than I expected, as it mainly involved 
moving pieces of code from one place to another, and not a "real", meaningful 
refactoring.



From: Mich Talebzadeh 
Sent: Monday, May 27, 2024 15:43
Cc: user@spark.apache.org 
Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame 
Processing


This message contains hyperlinks, take precaution before opening these links.

Few ideas on top of my head for how to go about solving the problem


  1.  Try with subsets: Try reproducing the issue with smaller subsets of your 
data to pinpoint the specific operation causing the memory problems.
  2.  Explode or Flatten Nested Structures: If your DataFrame schema involves 
deep nesting, consider using techniques like explode or flattening to transform 
it into a less nested structure. This can reduce memory usage during operations 
like withColumn.
  3.  Lazy Evaluation: Use select before withColumn: this ensures lazy 
evaluation, meaning Spark only materializes the data when necessary. This can 
improve memory usage compared to directly calling withColumn on the entire 
DataFrame.
  4.  spark.sql.shuffle.partitions: Setting this configuration to a value close 
to the number of executors can improve shuffle performance and potentially 
reduce memory usage.
  5.  Spark UI Monitoring: Utilize the Spark UI to monitor memory usage 
throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan 
 wrote:
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been facing 
while processing certain large and nested DataFrames using Apache Spark. We 
have encountered a scenario where the driver runs out of memory when applying 
the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the 
same DataFrames are processed successfully in Spark 2.4.0.

Problem Summary:
For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes 
the driver to choke and run out of memory. However, the same DataFrames are 
processed successfully in Spark 2.4.0.

Heap Dump Analysis:
We performed a heap dump analysis after enabling heap dump on out-of-memory 
errors, and the analysis revealed the following significant frames and local 
variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
 (DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes


Spark 3.1.3 with Hive dynamic partitions fails while driver moves the staged files

2023-12-11 Thread Shay Elbaz
Hi all,

Running on Dataproc 2.0/1.3/1.4, we use INSERT INTO OVERWRITE command to insert 
new (time) partitions into existing Hive tables. But we see too many failures 
coming from org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles. This is where 
the driver moves the successful files from staging to final directory.
For some reason, the underlying FS implementation - GoogleCloudStorageImpl in 
this case - fails to move at least one file and the exception is propagated all 
the way through. We see many different failures - from 
hadoop.FileSystem.mkdirs, rename, etc., all coming from Hive.replaceFiles().
I guess FS failures are expected, but nowhere in 
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions there is 
try/catch/retry mechanism. Is it expected that the FS implementation do it?
Because the GCS connector does not seem to do that :)
We ended up patching and rebuilding hive-exec jar as an immediate mitigation 
(try/catch/retry), while our platform teams are reaching out to GCP support.

I know this is more of a Hive issue rather than Spark, but still I wonder if 
anybody has encountered this issue, or similar?

Thanks,
Shay







Re: [EXTERNAL] Re: Re: Incorrect csv parsing when delimiter used within the data

2023-01-04 Thread Shay Elbaz
If you have found a parser that works, simply read the data as text files, 
apply the parser manually, and convert to DataFrame (if needed at all),

From: Saurabh Gulati 
Sent: Wednesday, January 4, 2023 3:45 PM
To: Sean Owen 
Cc: Mich Talebzadeh ; User 
Subject: [EXTERNAL] Re: Re: Incorrect csv parsing when delimiter used within 
the data


ATTENTION: This email originated from outside of GM.


Hi @Sean Owen
Probably the data is incorrect, and the source needs to fix it.
But using python's csv parser returns the correct results.

import csv

with open("/tmp/test.csv") as c_file:

csv_reader = csv.reader(c_file, delimiter=",")
for row in csv_reader:
print(row)

['a', 'b', 'c']
['1', '', ',see what "I did",\ni am still writing']
['2', '', 'abc']
And also, I don't understand why there is a distinction in outputs from 
df.show()​ and df.select("c").show()​

Mvg/Regards
Saurabh Gulati
Data Platform

From: Sean Owen 
Sent: 04 January 2023 14:25
To: Saurabh Gulati 
Cc: Mich Talebzadeh ; User 
Subject: Re: [EXTERNAL] Re: Incorrect csv parsing when delimiter used within 
the data

That input is just invalid as CSV for any parser. You end a quoted col without 
following with a col separator. What would the intended parsing be and how 
would it work?

On Wed, Jan 4, 2023 at 4:30 AM Saurabh Gulati 
mailto:saurabh.gul...@fedex.com>> wrote:

@Sean Owen Also see the example below with quotes 
feedback:
"a","b","c"
"1","",",see what ""I did"","
"2","","abc"


Re: How to set a config for a single query?

2023-01-04 Thread Shay Elbaz
Hi Felipe,

I had the same problem - needed to execute multiple jobs/actions multithreaded, 
with slightly different sql configs per job (mainly 
spark.sql.shuffle.partitions). I'm not sure if this is the best solution, but I 
ended up using newSession() per thread. It works well except for the new 
SparkSession does not contain custom configurations from the original session. 
I had to re-apply the important configurations (catalogs, etc.) on the new 
Sessions as well. Hope that helps.

Shay

From: Saurabh Gulati 
Sent: Wednesday, January 4, 2023 11:54 AM
To: Felipe Pessoto ; user@spark.apache.org 

Subject: [EXTERNAL] Re: How to set a config for a single query?


ATTENTION: This email originated from outside of GM.


Hey Felipe,
Since you are collecting the dataframes, you might as well run them separately 
with desired configs and store them in your storage.

Regards
Saurabh

From: Felipe Pessoto 
Sent: 04 January 2023 01:14
To: user@spark.apache.org 
Subject: [EXTERNAL] How to set a config for a single query?

Caution! This email originated outside of FedEx. Please do not open attachments 
or click links from an unknown or suspicious origin.


Hi,



In Scala is it possible to set a config value to a single query?



I could set/unset the value, but it won’t work for multithreading scenarios.



Example:



spark.sql.adaptive.coalescePartitions.enabled = false

queryA_df.collect

spark.sql.adaptive.coalescePartitions.enabled=original value

queryB_df.collect

queryC_df.collect

queryD_df.collect





If I execute that block of code multiple times using multiple thread, I can end 
up executing Query A with coalescePartitions.enabled=true, and Queries B, C and 
D with the config set to false, because another thread could set it between the 
executions.



Is there any good alternative to this?



Thanks.


Re: [EXTERNAL] [SPARK Memory management] Does Spark support setting limits/requests for driver/executor memory ?

2022-12-08 Thread Shay Elbaz
Had the same issue, it seems that it is simply not possible - 
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala#L195

There's also a Jira ticket - 
https://issues.apache.org/jira/browse/SPARK-37358?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20text%20~%20%22kubernetes%20request%20limit%22%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

[https://opengraph.githubassets.com/0507edbcc6647cf1b2753e345f2e763cefb4dacc64818621fa683d29e0ceee20/apache/spark]
spark/BasicExecutorFeatureStep.scala at master · 
apache/spark
Apache Spark - A unified analytics engine for large-scale data processing - 
spark/BasicExecutorFeatureStep.scala at master · apache/spark
github.com



From: Yosr Kchaou 
Sent: Wednesday, December 7, 2022 10:19 AM
To: user@spark.apache.org 
Subject: [EXTERNAL] [SPARK Memory management] Does Spark support setting 
limits/requests for driver/executor memory ?


ATTENTION: This email originated from outside of GM.


Hello,

We are running Spark on Kubernetes and noticed that driver/executors use the 
same value for memory request and memory limit. We see that limits/requests can 
be set only for cpu using the following options: 
spark.kubernetes.{driver/executor}.limit.cores and 
spark.kubernetes.{driver/executor}.request.cores.

In our case, it is useful to set a memory request lower than the memory limit 
as it helps us better manage our cluster resources.

Is there a way to define limits/requests for memory resources for 
driver/executor pods ?

Thanks in advance.

Yosr




Re: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-14 Thread Shay Elbaz
We're actually running on on-prem Kubernetes with a custom-built build Spark 
image, with altered entrypoint.sh and other "low-level" scripts and configs, 
but I don't think this is a good direction to solve this specific issue.

Shay

From: Artemis User 
Sent: Thursday, November 3, 2022 8:35 PM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Now I see what you want to do.  If you have access to the cluster 
configuration files, you can modify the spark-env.sh file on the worker nodes 
to specify exactly which node you'd like to link with GPU cores and which one 
not.  This would allow only those nodes configured with GPU-resources getting 
scheduled/acquired for your GPU tasks (see Rapids user guide at 
https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-on-prem.html).

We are using Rapids in our on-prem Spark environment with complete control of 
OS, file and network systems, containers and even hardware/GPU settings.  I 
guess you are using one of the cloud services so I am not sure if you have 
access to the low-level cluster config on EMR or GCP, which gave you a 
cookie-cutter type of cluster settings with limited configurability.  But under 
the hood, I believe they do use Nvidia Rapids which currently is the only 
option for GPU acceleration in Spark (Spark 3.x.x distribution package doesn't 
include Rapids or any GPU integration libs).  So you may want to dive into the 
Rapids instructions for more configuration and usage info (it does provide 
detailed instructions on how to run Rapids on EMR, Databricks and GCP).

On 11/3/22 12:10 PM, Shay Elbaz wrote:
Thanks again Artemis, I really appreciate it. I have watched the video but did 
not find an answer.

Please bear with me just one more iteration 

Maybe I'll be more specific:
Suppose I start the application with maxExecutors=500, executors.cores=2, 
because that's the amount of resources needed for the ETL part. But for the DL 
part I only need 20 GPUs. SLS API only allows to set the resources per 
executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I 
configure the profile with 1 GPU per executor.
So, the question is how do I limit the stage resources to 20 GPUs total?

Thanks again,
Shay


From: Artemis User <mailto:arte...@dtechspace.com>
Sent: Thursday, November 3, 2022 5:23 PM
To: user@spark.apache.org<mailto:user@spark.apache.org> 
<mailto:user@spark.apache.org>
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Shay,  You may find this video helpful (with some API code samples that you 
are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc=171s.  The 
issue here isn't how to limit the number of executors but to request for the 
right GPU-enabled executors dynamically.  Those executors used in pre-GPU 
stages should be returned back to resource managers with dynamic resource 
allocation enabled (and with the right DRA policies).  Hope this helps..

Unfortunately there isn't a lot of detailed docs for this topic since GPU 
acceleration is kind of new in Spark (not straightforward like in TF).   I wish 
the Spark doc team could provide more details in the next release...

On 11/3/22 2:37 AM, Shay Elbaz wrote:
Thanks Artemis. We are not using Rapids, but rather using GPUs through the 
Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to 
turn on shuffle tracking for dynamic allocation, anyhow.
The question is how we can limit the number of executors when building a new 
ResourceProfile, directly (API) or indirectly (some advanced workaround).

Thanks,
Shay



From: Artemis User <mailto:arte...@dtechspace.com>
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org<mailto:user@spark.apache.org> 
<mailto:user@spark.apache.org>
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors 
when using GPUs


ATTENTION: This email originated from outside of GM.

  Are you using Rapids for GPU support in Spark?  Couple of options you may 
want to try:

  1.  In addition to dynamic allocation turned on, you may also need to turn on 
external shuffling service.
  2.  Sounds like you are using Kubernetes.  In that case, you may also need to 
turn on shuffle tracking.
  3.  The "stages" are controlled by the APIs.  The APIs for dynamic resource 
request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest 
and ExecutorResourceRequest).

On 11/2/22 11:30 AM, Shay Elbaz wrote:
Hi,

Our typical applications need less executors for a GPU stage than for a CPU 
stage. We are using dynamic allocation with stage level scheduling, and Spark 
tries

Re: [EXTERNAL] Re: Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-06 Thread Shay Elbaz
I don't think there is a definitive right or wrong approach here. The SLS 
feature would not have been added to Spark if there was no real need for it, 
and AFAIK it required quite a bit of refactoring of Spark internals. So I'm 
sure this discussion was already made in the developers community  :)

In my specific case, I need it also for interactive dev/research sessions on 
Jupyter notebooks, and it makes more sense to switch resources than stopping 
the session and starting a new one (over and over again).

Shay

From: ayan guha 
Sent: Sunday, November 6, 2022 4:19 PM
To: Shay Elbaz 
Cc: Artemis User ; Tom Graves ; 
Tom Graves ; user@spark.apache.org 

Subject: [EXTERNAL] Re: Re: Re: Re: Re: Stage level scheduling - lower the 
number of executors when using GPUs


ATTENTION: This email originated from outside of GM.


May I ask why the ETL job and DL ( Assuming you mean deep learning here) task 
can not be run as 2 separate spark job?

IMHO it is better practice to split up entire pipeline into logical steps and 
orchestrate them.

That way you can pick your profile as you need for 2 very different type of 
workloads.

Ayan

On Sun, 6 Nov 2022 at 12:04 am, Shay Elbaz 
mailto:shay.el...@gm.com>> wrote:
Consider this:

  1.  The application is allowed to use only 20 GPUs.
  2.  To ensure exactly 20 GPUs, I use the 
df.rdd.repartition(20).withResources(gpus.build).mapPartitions(func) technique. 
(maxExecutors >> 20).
  3.  Given the volume of the input data, it takes 20 hours total to run the DL 
part (computer vision) on 20 GPUs, or 1 hour per GPU task.

Normally, I would repartition to 200 partitions to get a finer grained ~6 
minutes tasks instead of 1 hour. But here we're "forced" to use only 20 
partitions. To be clear, I'm only referring to potential failures/lags here. 
The job needs at least 20 hours total (on 20 GPUs) no matter what, but if any 
task fails after 50 minutes for example, we have to re-process these 50 minutes 
again. Or if a task/executor lags behind due to environment issues, then 
speculative execution will only trigger another task after 1 hour. These issues 
would be avoided if we used 200 partitions, but then Spark will try to allocate 
more than 20 GPUs.

I hope that was more clear.
Thank you very much for helping.

Shay


From: Tom Graves mailto:tgraves...@yahoo.com>>
Sent: Friday, November 4, 2022 4:19 PM
To: Tom Graves ; Artemis User 
mailto:arte...@dtechspace.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>; Shay Elbaz 
mailto:shay.el...@gm.com>>
Subject: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number 
of executors when using GPUs


ATTENTION: This email originated from outside of GM.


So I'm not sure I completely follow. Are you asking for a way to change the 
limit without having to do the repartition?  And your DL software doesn't care 
if you got say 30 executors instead of 20?  Normally I would expect the number 
fo partitions at that point to be 200 (or whatever you set for your shuffle 
partitions) unless you are using AQE coalescing partitions functionality and 
then it could change. Are you using the latter?

> Normally I try to aim for anything between 30s-5m per task (failure-wise), 
> depending on the cluster, its stability, etc. But in this case, individual 
> tasks can take 30-60 minutes, if not much more. Any failure during this long 
> time is pretty expensive.

Are you saying when you manually do the repartition your DL tasks take 30-60 
minutes?  so again you want like AQE coalesce partitions to kick in to attempt 
to pick partition sizes for your?


Tom

On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz 
mailto:shay.el...@gm.com>> wrote:


This is exactly what we ended up doing! The only drawback I saw with this 
approach is that the GPU tasks get pretty big (in terms of data and compute 
time), and task failures become expansive. That's why I reached out to the 
mailing list in the first place 
Normally I try to aim for anything between 30s-5m per task (failure-wise), 
depending on the cluster, its stability, etc. But in this case, individual 
tasks can take 30-60 minutes, if not much more. Any failure during this long 
time is pretty expensive.


Shay

From: Tom Graves 
Sent: Thursday, November 3, 2022 7:56 PM
To: Artemis User mailto:arte...@dtechspace.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>; Shay Elbaz 
mailto:shay.el...@gm.com>>
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.


Stage level scheduling does not allow you to change configs right now. This is 
something we thought about as follow on but have never impleme

Re: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-05 Thread Shay Elbaz
Consider this:

  1.  The application is allowed to use only 20 GPUs.
  2.  To ensure exactly 20 GPUs, I use the 
df.rdd.repartition(20).withResources(gpus.build).mapPartitions(func) technique. 
(maxExecutors >> 20).
  3.  Given the volume of the input data, it takes 20 hours total to run the DL 
part (computer vision) on 20 GPUs, or 1 hour per GPU task.

Normally, I would repartition to 200 partitions to get a finer grained ~6 
minutes tasks instead of 1 hour. But here we're "forced" to use only 20 
partitions. To be clear, I'm only referring to potential failures/lags here. 
The job needs at least 20 hours total (on 20 GPUs) no matter what, but if any 
task fails after 50 minutes for example, we have to re-process these 50 minutes 
again. Or if a task/executor lags behind due to environment issues, then 
speculative execution will only trigger another task after 1 hour. These issues 
would be avoided if we used 200 partitions, but then Spark will try to allocate 
more than 20 GPUs.

I hope that was more clear.
Thank you very much for helping.

Shay


From: Tom Graves 
Sent: Friday, November 4, 2022 4:19 PM
To: Tom Graves ; Artemis User 
; user@spark.apache.org ; Shay 
Elbaz 
Subject: [EXTERNAL] Re: Re: Re: Re: Stage level scheduling - lower the number 
of executors when using GPUs


ATTENTION: This email originated from outside of GM.


So I'm not sure I completely follow. Are you asking for a way to change the 
limit without having to do the repartition?  And your DL software doesn't care 
if you got say 30 executors instead of 20?  Normally I would expect the number 
fo partitions at that point to be 200 (or whatever you set for your shuffle 
partitions) unless you are using AQE coalescing partitions functionality and 
then it could change. Are you using the latter?

> Normally I try to aim for anything between 30s-5m per task (failure-wise), 
> depending on the cluster, its stability, etc. But in this case, individual 
> tasks can take 30-60 minutes, if not much more. Any failure during this long 
> time is pretty expensive.

Are you saying when you manually do the repartition your DL tasks take 30-60 
minutes?  so again you want like AQE coalesce partitions to kick in to attempt 
to pick partition sizes for your?


Tom

On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz 
 wrote:


This is exactly what we ended up doing! The only drawback I saw with this 
approach is that the GPU tasks get pretty big (in terms of data and compute 
time), and task failures become expansive. That's why I reached out to the 
mailing list in the first place 
Normally I try to aim for anything between 30s-5m per task (failure-wise), 
depending on the cluster, its stability, etc. But in this case, individual 
tasks can take 30-60 minutes, if not much more. Any failure during this long 
time is pretty expensive.


Shay

From: Tom Graves 
Sent: Thursday, November 3, 2022 7:56 PM
To: Artemis User ; user@spark.apache.org 
; Shay Elbaz 
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.


Stage level scheduling does not allow you to change configs right now. This is 
something we thought about as follow on but have never implemented.  How many 
tasks on the DL stage are you running?  The typical case is run some etl lots 
of tasks... do mapPartitions and then run your DL stuff, before that 
mapPartitions you could do a repartition if necessary to get to exactly the 
number of tasks you want (20).  That way even if maxExecutors=500 you will only 
ever need 20 or whatever you repartition to and spark isn't going to ask for 
more then that.

Tom

On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz 
 wrote:


Thanks again Artemis, I really appreciate it. I have watched the video but did 
not find an answer.

Please bear with me just one more iteration 

Maybe I'll be more specific:
Suppose I start the application with maxExecutors=500, executors.cores=2, 
because that's the amount of resources needed for the ETL part. But for the DL 
part I only need 20 GPUs. SLS API only allows to set the resources per 
executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I 
configure the profile with 1 GPU per executor.
So, the question is how do I limit the stage resources to 20 GPUs total?

Thanks again,
Shay


From: Artemis User 
Sent: Thursday, November 3, 2022 5:23 PM

To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Shay,  You may find this video helpful (with some API code samples that you 
are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc=171s.  The 
issue here isn't how to limit the number of executors but to reques

Re: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-03 Thread Shay Elbaz
This is exactly what we ended up doing! The only drawback I saw with this 
approach is that the GPU tasks get pretty big (in terms of data and compute 
time), and task failures become expansive. That's why I reached out to the 
mailing list in the first place 
Normally I try to aim for anything between 30s-5m per task (failure-wise), 
depending on the cluster, its stability, etc. But in this case, individual 
tasks can take 30-60 minutes, if not much more. Any failure during this long 
time is pretty expensive.


Shay

From: Tom Graves 
Sent: Thursday, November 3, 2022 7:56 PM
To: Artemis User ; user@spark.apache.org 
; Shay Elbaz 
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.


Stage level scheduling does not allow you to change configs right now. This is 
something we thought about as follow on but have never implemented.  How many 
tasks on the DL stage are you running?  The typical case is run some etl lots 
of tasks... do mapPartitions and then run your DL stuff, before that 
mapPartitions you could do a repartition if necessary to get to exactly the 
number of tasks you want (20).  That way even if maxExecutors=500 you will only 
ever need 20 or whatever you repartition to and spark isn't going to ask for 
more then that.

Tom

On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz 
 wrote:


Thanks again Artemis, I really appreciate it. I have watched the video but did 
not find an answer.

Please bear with me just one more iteration 

Maybe I'll be more specific:
Suppose I start the application with maxExecutors=500, executors.cores=2, 
because that's the amount of resources needed for the ETL part. But for the DL 
part I only need 20 GPUs. SLS API only allows to set the resources per 
executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I 
configure the profile with 1 GPU per executor.
So, the question is how do I limit the stage resources to 20 GPUs total?

Thanks again,
Shay


From: Artemis User 
Sent: Thursday, November 3, 2022 5:23 PM

To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Shay,  You may find this video helpful (with some API code samples that you 
are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc=171s.  The 
issue here isn't how to limit the number of executors but to request for the 
right GPU-enabled executors dynamically.  Those executors used in pre-GPU 
stages should be returned back to resource managers with dynamic resource 
allocation enabled (and with the right DRA policies).  Hope this helps..

Unfortunately there isn't a lot of detailed docs for this topic since GPU 
acceleration is kind of new in Spark (not straightforward like in TF).   I wish 
the Spark doc team could provide more details in the next release...

On 11/3/22 2:37 AM, Shay Elbaz wrote:
Thanks Artemis. We are not using Rapids, but rather using GPUs through the 
Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to 
turn on shuffle tracking for dynamic allocation, anyhow.
The question is how we can limit the number of executors when building a new 
ResourceProfile, directly (API) or indirectly (some advanced workaround).

Thanks,
Shay



From: Artemis User <mailto:arte...@dtechspace.com>
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org<mailto:user@spark.apache.org> 
<mailto:user@spark.apache.org>
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors 
when using GPUs


ATTENTION: This email originated from outside of GM.

  Are you using Rapids for GPU support in Spark?  Couple of options you may 
want to try:

  1.  In addition to dynamic allocation turned on, you may also need to turn on 
external shuffling service.
  2.  Sounds like you are using Kubernetes.  In that case, you may also need to 
turn on shuffle tracking.
  3.  The "stages" are controlled by the APIs.  The APIs for dynamic resource 
request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest 
and ExecutorResourceRequest).

On 11/2/22 11:30 AM, Shay Elbaz wrote:
Hi,

Our typical applications need less executors for a GPU stage than for a CPU 
stage. We are using dynamic allocation with stage level scheduling, and Spark 
tries to maximize the number of executors also during the GPU stage, causing a 
bit of resources chaos in the cluster. This forces us to use a lower value for 
'maxExecutors' in the first place, at the cost of the CPU stages performance. 
Or try to solve this in the Kubernets scheduler level, which is not 
straightforward and doesn't feel like the right way to go.

Is there a way to effectively use less executors in Stage Lev

Re: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-03 Thread Shay Elbaz
Thanks again Artemis, I really appreciate it. I have watched the video but did 
not find an answer.

Please bear with me just one more iteration 

Maybe I'll be more specific:
Suppose I start the application with maxExecutors=500, executors.cores=2, 
because that's the amount of resources needed for the ETL part. But for the DL 
part I only need 20 GPUs. SLS API only allows to set the resources per 
executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I 
configure the profile with 1 GPU per executor.
So, the question is how do I limit the stage resources to 20 GPUs total?

Thanks again,
Shay


From: Artemis User 
Sent: Thursday, November 3, 2022 5:23 PM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs


ATTENTION: This email originated from outside of GM.

  Shay,  You may find this video helpful (with some API code samples that you 
are looking for).  https://www.youtube.com/watch?v=JNQu-226wUc=171s.  The 
issue here isn't how to limit the number of executors but to request for the 
right GPU-enabled executors dynamically.  Those executors used in pre-GPU 
stages should be returned back to resource managers with dynamic resource 
allocation enabled (and with the right DRA policies).  Hope this helps..

Unfortunately there isn't a lot of detailed docs for this topic since GPU 
acceleration is kind of new in Spark (not straightforward like in TF).   I wish 
the Spark doc team could provide more details in the next release...

On 11/3/22 2:37 AM, Shay Elbaz wrote:
Thanks Artemis. We are not using Rapids, but rather using GPUs through the 
Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to 
turn on shuffle tracking for dynamic allocation, anyhow.
The question is how we can limit the number of executors when building a new 
ResourceProfile, directly (API) or indirectly (some advanced workaround).

Thanks,
Shay



From: Artemis User <mailto:arte...@dtechspace.com>
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org<mailto:user@spark.apache.org> 
<mailto:user@spark.apache.org>
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors 
when using GPUs


ATTENTION: This email originated from outside of GM.

  Are you using Rapids for GPU support in Spark?  Couple of options you may 
want to try:

  1.  In addition to dynamic allocation turned on, you may also need to turn on 
external shuffling service.
  2.  Sounds like you are using Kubernetes.  In that case, you may also need to 
turn on shuffle tracking.
  3.  The "stages" are controlled by the APIs.  The APIs for dynamic resource 
request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest 
and ExecutorResourceRequest).

On 11/2/22 11:30 AM, Shay Elbaz wrote:
Hi,

Our typical applications need less executors for a GPU stage than for a CPU 
stage. We are using dynamic allocation with stage level scheduling, and Spark 
tries to maximize the number of executors also during the GPU stage, causing a 
bit of resources chaos in the cluster. This forces us to use a lower value for 
'maxExecutors' in the first place, at the cost of the CPU stages performance. 
Or try to solve this in the Kubernets scheduler level, which is not 
straightforward and doesn't feel like the right way to go.

Is there a way to effectively use less executors in Stage Level Scheduling? The 
API does not seem to include such an option, but maybe there is some more 
advanced workaround?

Thanks,
Shay









Re: [EXTERNAL] Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-03 Thread Shay Elbaz
Thanks Artemis. We are not using Rapids, but rather using GPUs through the 
Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to 
turn on shuffle tracking for dynamic allocation, anyhow.
The question is how we can limit the number of executors when building a new 
ResourceProfile, directly (API) or indirectly (some advanced workaround).

Thanks,
Shay



From: Artemis User 
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors 
when using GPUs


ATTENTION: This email originated from outside of GM.

  Are you using Rapids for GPU support in Spark?  Couple of options you may 
want to try:

  1.  In addition to dynamic allocation turned on, you may also need to turn on 
external shuffling service.
  2.  Sounds like you are using Kubernetes.  In that case, you may also need to 
turn on shuffle tracking.
  3.  The "stages" are controlled by the APIs.  The APIs for dynamic resource 
request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest 
and ExecutorResourceRequest).

On 11/2/22 11:30 AM, Shay Elbaz wrote:
Hi,

Our typical applications need less executors for a GPU stage than for a CPU 
stage. We are using dynamic allocation with stage level scheduling, and Spark 
tries to maximize the number of executors also during the GPU stage, causing a 
bit of resources chaos in the cluster. This forces us to use a lower value for 
'maxExecutors' in the first place, at the cost of the CPU stages performance. 
Or try to solve this in the Kubernets scheduler level, which is not 
straightforward and doesn't feel like the right way to go.

Is there a way to effectively use less executors in Stage Level Scheduling? The 
API does not seem to include such an option, but maybe there is some more 
advanced workaround?

Thanks,
Shay








Stage level scheduling - lower the number of executors when using GPUs

2022-11-02 Thread Shay Elbaz
Hi,

Our typical applications need less executors for a GPU stage than for a CPU 
stage. We are using dynamic allocation with stage level scheduling, and Spark 
tries to maximize the number of executors also during the GPU stage, causing a 
bit of resources chaos in the cluster. This forces us to use a lower value for 
'maxExecutors' in the first place, at the cost of the CPU stages performance. 
Or try to solve this in the Kubernets scheduler level, which is not 
straightforward and doesn't feel like the right way to go.

Is there a way to effectively use less executors in Stage Level Scheduling? The 
API does not seem to include such an option, but maybe there is some more 
advanced workaround?

Thanks,
Shay







PySpark schema sanitization

2022-08-14 Thread Shay Elbaz
Hi,

I have a simple ETL application, where the data source schama needs to be 
sanitized. Column names might include special characters that need to be 
removed. For example, from "some{column}" to "some_column".
Normally I'd just alias the columns, but in this case the schema can have 
thousands of deeply nested columns. Creating a new StructType feels more 
intuitive and simpler, but the only way I know of to apply the new schema is to 
create a new dataframe -
spark.createDataFrame(old_df.rdd, new_schema). This makes the deserialization 
and re-serialization of the dataframe the most expensive operation in that 
"simple" ETL app.

To make things worse, since it's a pyspark application, the RDD is treated as 
Python RDD and all the data is moving from the JVM to Python and back, without 
any real transformation.
This is resolved by creating the new DF on the JVM only:

jschema = 
spark._sc._jvm.org.apache.spark.sql.types.DataType.fromJson(sanitized_schema.json())
sanitized_df = DataFrame(spark._jsparkSession.createDataFrame(df._jdf.rdd(), 
jschema), spark)

Is there another way to do a bulk rename operation? I'd like to avoid creating 
some uber "select" statement with aliases, or multiple withColumnRenamed 
operations, as much as possible, mainly for maintenance reasons.

Thanks


Re: [EXTERNAL] Partial data with ADLS Gen2

2022-07-24 Thread Shay Elbaz
This is a known issue. Apache Iceberg, Hudi and Delta lake and among the 
possible solutions.
Alternatively, instead of writing the output directly to the "official" 
location, write it to some staging directory instead. Once the job is done, 
rename the staging dir to the official location.

From: kineret M 
Sent: Sunday, July 24, 2022 1:06 PM
To: user@spark.apache.org 
Subject: [EXTERNAL] Partial data with ADLS Gen2


ATTENTION: This email originated from outside of GM.


I have spark batch application writing to ADLS Gen2 (hierarchy).
When designing the application I was sure the spark would perform global commit 
once the job is committed, but what it really does it commits on each task, 
meaning once task completes writing it moves from temp to target storage. So if 
the batch fails we have partial data, and on retry we are getting data 
duplications.
Our scale is really huge so rolling back (deleting data) is not an option for 
us, the search will takes a lot of time.
Is there any "build in" solution, something we can use out of the box?

Thanks.


Re: spark.executor.pyspark.memory not added to the executor resource request on Kubernetes

2022-07-19 Thread Shay Elbaz
... using spark 3.2.1

From: Shay Elbaz 
Sent: Tuesday, July 19, 2022 1:26 PM
To: user@spark.apache.org 
Cc: Jeffrey O'Donoghue 
Subject: [EXTERNAL] spark.executor.pyspark.memory not added to the executor 
resource request on Kubernetes


ATTENTION: This email originated from outside of GM.


Hi,

We are trying tune executor memory on Kubernetes. Specifically, 8g for the jvm, 
8g for the python process, and additional 500m overhead:
--conf spark.executor.memory=8g
--conf spark.executor.pyspark.memory=8g
--conf spark.executor.memoryOverhead=500m

 According the docs, the executor pods should have 8+8+0.5 requested memory 
(spark.executor.pyspark.memory: "... When PySpark is run in YARN or Kubernetes, 
this memory is added to executor resource requests").
On Spark UI we can see the right configuration:

Executor Reqs:
  cores: [amount: 1]
  offHeap: [amount: 0]
  memoryOverhead: [amount: 500]
  pyspark.memory: [amount: 8192]
  memory: [amount: 8192]
Task Reqs:
  cpus: [amount: 1.0]


However, the running pod spec is different:
Limits:
  memory:  8692Mi
Requests:
  cpu: 1
  memory:  8692Mi



Looks like pyspark.memory value was not added to the resource request.
What are we missing?

Thanks,
Shay



spark.executor.pyspark.memory not added to the executor resource request on Kubernetes

2022-07-19 Thread Shay Elbaz
Hi,

We are trying tune executor memory on Kubernetes. Specifically, 8g for the jvm, 
8g for the python process, and additional 500m overhead:
--conf spark.executor.memory=8g
--conf spark.executor.pyspark.memory=8g
--conf spark.executor.memoryOverhead=500m

 According the docs, the executor pods should have 8+8+0.5 requested memory 
(spark.executor.pyspark.memory: "... When PySpark is run in YARN or Kubernetes, 
this memory is added to executor resource requests").
On Spark UI we can see the right configuration:

Executor Reqs:
  cores: [amount: 1]
  offHeap: [amount: 0]
  memoryOverhead: [amount: 500]
  pyspark.memory: [amount: 8192]
  memory: [amount: 8192]
Task Reqs:
  cpus: [amount: 1.0]


However, the running pod spec is different:
Limits:
  memory:  8692Mi
Requests:
  cpu: 1
  memory:  8692Mi



Looks like pyspark.memory value was not added to the resource request.
What are we missing?

Thanks,
Shay



Re: [EXTERNAL] spark re-use shuffle files not happening

2022-07-16 Thread Shay Elbaz
Spark can reuse shuffle stages in the same job (action), not cross jobs.

From: Koert Kuipers 
Sent: Saturday, July 16, 2022 6:43 PM
To: user 
Subject: [EXTERNAL] spark re-use shuffle files not happening


ATTENTION: This email originated from outside of GM.


i have seen many jobs where spark re-uses shuffle files (and skips a stage of a 
job), which is an awesome feature given how expensive shuffles are, and i 
generally now assume this will happen.

however i feel like i am going a little crazy today. i did the simplest test in 
spark 3.3.0, basically i run 2 jobs within same spark shell, so using same 
spark session, and broadcast join is disabled so we get shuffles:
1) job1 joins dataframe1 with dataframe0 and writes results out.
2) job2 joins dataframe2 with dataframe0 and writes results out.

i would expect job2 to skip the stage where dataframe0 is getting shuffled, but 
its not skipping it! what am i missing?
is shuffle re-use only enabled within same job/action? that goes against what i 
remember...

code:
$ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
scala> val data0 = spark.read.format("csv").option("header", 
true).load("data0.csv")
scala> val data1 = spark.read.format("csv").option("header", 
true).load("data1.csv")
scala> val data2 = spark.read.format("csv").option("header", 
true).load("data2.csv")
scala> data1.join(data0, "key").write.format("parquet").save("out1")
scala> data2.join(data0, "key").write.format("parquet").save("out2") // should 
skip stage that scans csv for data0 and writes shuffle files... but it doesn't



CONFIDENTIALITY NOTICE: This electronic communication and any files transmitted 
with it are confidential, privileged and intended solely for the use of the 
individual or entity to whom they are addressed. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, distribution 
(electronic or otherwise) or forwarding of, or the taking of any action in 
reliance on the contents of this transmission is strictly prohibited. Please 
notify the sender immediately by e-mail if you have received this email by 
mistake and delete this email from your system.

Is it necessary to print this email? If you care about the environment like we 
do, please refrain from printing emails. It helps to keep the environment 
forested and litter-free.


Re: [EXTERNAL] RDD.pipe() for binary data

2022-07-10 Thread Shay Elbaz
Yuhao,


You can use pyspark as entrypoint to your application. With py4j you can call 
Java/Scala functions from the python application. There's no need to use the 
pipe() function for that.


Shay


From: Yuhao Zhang 
Sent: Saturday, July 9, 2022 4:13:42 AM
To: user@spark.apache.org
Subject: [EXTERNAL] RDD.pipe() for binary data

ATTENTION: This email originated from outside of GM.



Hi All,

I'm currently working on a project involving transferring between  Spark 3.x (I 
use Scala) and a Python runtime. In Spark, data is stored in an RDD as 
floating-point number arrays/vectors and I have custom routines written in 
Python to process them. On the Spark side, I also have some operations specific 
to Spark Scala APIs, so I need to use both runtimes.

Now to achieve data transfer I've been using the RDD.pipe() API, by 1. 
converting the arrays to strings in Spark and calling RDD.pipe(script.py) 2. 
Then Python receives the strings and casts them as Python's data structures and 
conducts operations. 3. Python converts the arrays into strings and prints them 
back to Spark. 4. Spark gets the strings and cast them back as arrays.

Needless to say, this feels unnatural and slow to me, and there are some 
potential floating-point number precision issues, as I think the floating 
number arrays should have been transmitted as raw bytes. I found no way to use 
the RDD.pipe() for this purpose, as written in 
https://github.com/apache/spark/blob/3331d4ccb7df9aeb1972ed86472269a9dbd261ff/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L139,
 .pipe() seems to be locked with text-based streaming.

Can anyone shed some light on how I can achieve this? I'm trying to come up 
with a way that does not involve modifying the core Spark myself. One potential 
solution I can think of is saving/loading the RDD as binary files but I'm 
hoping to find a streaming-based solution. Any help is much appreciated, thanks!


Best regards,
Yuhao


How to update TaskMetrics from Python?

2022-06-16 Thread Shay Elbaz
Hi All,


I have some data output source which can only be written to by a specific 
Python API. For that I am (ab)using foreachPartition(writing_func) from PySpark 
which works pretty well.

I wonder if its possible to somehow update the task metrics - specifically 
setBytesWritten - at the end of every partition. On the surface it seems 
impossible to me, for 2 reasons:

  1.  I don't think there is an open py4j gateway in a task context
  2.  TaskMetrics is accessed via ThreadLocal, so even with an open gateway I 
don't think it'll hit the specific thread


Does anyone know of existing solution or a workaround?


Thanks

Shay


RE: [EXTERNAL] Re: Spark on K8s - repeating annoying exception

2022-05-15 Thread Shay Elbaz
Hi Martin,

Thanks for the help :) I tried to set those keys to high value but the error 
persists every 90 seconds.


Shay


From: Martin Grigorov 
Sent: Friday, May 13, 2022 4:15 PM
To: Shay Elbaz 
Cc: user@spark.apache.org
Subject: [EXTERNAL] Re: Spark on K8s - repeating annoying exception

ATTENTION: This email originated from outside of GM.



Hi,

On Mon, May 9, 2022 at 5:57 PM Shay Elbaz 
mailto:shay.el...@gm.com>> wrote:
Hi all,

I apologize for reposting this from Stack Overflow, but it got very little 
attention and now comment.

I'm using Spark 3.2.1 image that was built from the official distribution via 
`docker-image-tool.sh', on Kubernetes 1.18 cluster.
Everything works fine, except for this error message on stdout every 90 seconds:

Wild guess: K8S API polling ?!

https://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties

- spark.kubernetes.executor.apiPollingInterval
- spark.kubernetes.executor.missingPodDetectDelta

but for both settings the default is 30s, not 90s



WARN WatcherWebSocketListener: Exec Failure
java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:61)
at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
at 
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

This message does not effect the application, but it's really annoying, and 
especially for Jupyter users. The lack of details makes it very hard to debug.
It appears on any submit variation - spark-submit, pyspark, spark-shell.
I've found traces of it on the internet, but all occurrences were from older 
versions of Spark and were resolved by using "newer" version of fabric8 (4.x).
Spark 3.2.1 already use fabric8 version 5.4.1.
I wonder if anyone else still sees this error in Spark 3.x, and has a 
resolution.

Thanks,
Shay.


Spark on K8s - repeating annoying exception

2022-05-09 Thread Shay Elbaz
Hi all,

I apologize for reposting this from Stack Overflow, but it got very little 
attention and now comment.

I'm using Spark 3.2.1 image that was built from the official distribution via 
`docker-image-tool.sh', on Kubernetes 1.18 cluster.
Everything works fine, except for this error message on stdout every 90 seconds:

WARN WatcherWebSocketListener: Exec Failure
java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:61)
at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
at 
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

This message does not effect the application, but it's really annoying, and 
especially for Jupyter users. The lack of details makes it very hard to debug.
It appears on any submit variation - spark-submit, pyspark, spark-shell.
I've found traces of it on the internet, but all occurrences were from older 
versions of Spark and were resolved by using "newer" version of fabric8 (4.x).
Spark 3.2.1 already use fabric8 version 5.4.1.
I wonder if anyone else still sees this error in Spark 3.x, and has a 
resolution.

Thanks,
Shay.


RE: [EXTERNAL] Parse Execution Plan from PySpark

2022-05-03 Thread Shay Elbaz
Hi Pablo,

As you probably know, Spark SQL generates custom Java code for the SQL 
functions. You can use geometry.debugCodegen() to print out the generated code.

Shay

From: Pablo Alcain 
Sent: Tuesday, May 3, 2022 6:07 AM
To: user@spark.apache.org
Subject: [EXTERNAL] Parse Execution Plan from PySpark

ATTENTION: This email originated from outside of GM.



Hello all! I'm working with PySpark trying to reproduce some of the results we 
see on batch through streaming processes, just as a PoC for now. For this, I'm 
thinking of trying to interpret the execution plan and eventually write it back 
to Python (I'm doing something similar with pandas as well, and I'd like both 
approaches to be as similar as possible).

Let me clarify with an example: suppose that starting with a `geometry.csv` 
file with `width` and `height` I want to calculate the `area` doing this:

>>> geometry = spark.read.csv('geometry.csv', header=True)
>>> geometry = geometry.withColumn('area', F.col('width') * F.col('height'))

I would like to extract from the execution plan the fact that area is 
calculated as the product of width * height. One possibility would be to parse 
the execution plan:

>>> geometry.explain(True)

...
== Optimized Logical Plan ==
Project [width#45, height#46, (cast(width#45 as double) * cast(height#46 as 
double)) AS area#64]
+- Relation [width#45,height#46] csv
...

From the first line of the Logical Plan we can parse the formula "area = height 
* width" and then write the function back in any language.

However, even though I'm getting the logical plan as a string, there has to be 
some internal representation that I could leverage and avoid the string 
parsing. Do you know if/how I can access that internal representation from 
Python? I've been trying to navigate the scala source code to find it, but this 
is definitely beyond my area of expertise, so any pointers would be more than 
welcome.

Thanks in advance,
Pablo


This is a blog post explaining how to use a new Spark library, datafu-spark

2021-07-21 Thread Shay Elbaz
https://medium.com/paypal-tech/introducing-datafu-spark-ba67faf1933a


[https://miro.medium.com/max/1200/0*koSzBO7KqbmIpiPl]
Introducing DataFu-Spark. DataFu-Spark is a new addition to… | by Eyal Allweil 
| Technology at PayPal | Jul, 2021 | 
Medium
DataFu-Spark is a new addition to Apache DataFu, based on an internal PayPal 
library. In this article, we explain how you can use it.
medium.com