Re: Structured Streaming and Spark Connect

2024-09-23 Thread 刘唯
Hi Anastasiia,

Thanks for the email. I think you can tweak this spark config
*spark.connect.session.manager.defaultSessionTimeout,
*this is defined here*: *
https://github.com/apache/spark/blob/343471dac4b96b43a09763d759b6c30760fb626e/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala#L86-L93

Once the session timeout is changed to be longer Spark won't kill the
session and won't kill the query also.

Thanks,
Wei Liu

Anastasiia Sokhova  于2024年9月23日周一
09:41写道:

> Dear Spark Team,
>
>
>
> I am working with a standalone cluster, and I am using Spark Connect to
> submit my applications.
>
> My current version is 3.5.1.
>
>
>
> I am trying to run Structured Streaming Queries with relatively long
> trigger intervals (2 hours, 1 day).
>
> The first issue I encountered was “Streaming query has been idle and
> waiting for new data more than 1ms”. I solved it by increasing the
> value in the internal config property
>  ‘spark.sql.streaming.noDataProgressEventInterval’.
>
> Now my query is not considered idle anymore but Connect expires the
> session after ~1 hour, and the query is killed with it.
>
>
>
> I believe, I have studied everything I could find online, but I could not
> find the answers.
>
> I would really appreciate if you provided some 😊
>
>
>
> Is it not intended for Spark Connect to support “detached” Streaming
> Queries?
>
> Would you consider detaching StreamingQueries from the sessions that start
> them, as they are meant to run continuously?
>
> Would you consider extending control options in Spark Connect UI (start,
> stop, reset checkpoints)?
>
> It will help the users like me, who want to use Spark’s Structured
> Streaming and Connect without running additional applications just to keep
> the session alive.
>
>
>
> I will be happy to answer any question from your side or provide more
> details.
>
>
>
> Best regards,
>
> Anastasiia
>


Re: Structured Streaming and Spark Connect

2024-09-23 Thread Mich Talebzadeh
Hi Anastasia,

My take is that in its current form, Spark Connect is not suitable for
running long-lived Structured Streaming queries in Standalone mode,
especially with long trigger intervals. The lack of support for detached
streaming queries makes it problematic for this particular use case. To
make Structured Streaming work in Standalone mode, you could:


   1. Use spark-submit in cluster mode instead of Spark Connect.
   2. Consider alternative cluster managers like YARN or k8s for better
   driver management.

Specific answers

q) Is Spark Connect intended to support “detached” Streaming Queries?

No, currently Spark Connect ties queries to the client session. Streaming
queries stop when the session ends. Detached queries are not yet supported,

q) Could Streaming Queries be detached from the session, as they are
continuous?

This is a valid request. Detaching streaming queries would allow them to
run independently, ensuring long-running jobs don’t stop when the session
ends. This would require changes in Spark’s session management.

q) Would you extend control options in Spark Connect UI (start, stop, reset
checkpoints)?

Yes, adding controls to start, stop, or reset streaming queries would
improve usability, especially for production systems. This feature would
give users more dynamic management of long-running streaming jobs.

Have a look at this article of mine

Building an Event-Driven Real-Time Data Processor with Spark Structured
Streaming and API Integration
<https://www.linkedin.com/pulse/building-event-driven-real-time-data-processor-spark-mich-zy3ef/?trackingId=RIwY%2FePi0jslLiXqOP8mxQ%3D%3D>

HTH,

Mich Talebzadeh

Architect | Data Engineer | Data Science | Financial Crime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Mon, 23 Sept 2024 at 17:41, Anastasiia Sokhova
 wrote:

> Dear Spark Team,
>
>
>
> I am working with a standalone cluster, and I am using Spark Connect to
> submit my applications.
>
> My current version is 3.5.1.
>
>
>
> I am trying to run Structured Streaming Queries with relatively long
> trigger intervals (2 hours, 1 day).
>
> The first issue I encountered was “Streaming query has been idle and
> waiting for new data more than 1ms”. I solved it by increasing the
> value in the internal config property
>  ‘spark.sql.streaming.noDataProgressEventInterval’.
>
> Now my query is not considered idle anymore but Connect expires the
> session after ~1 hour, and the query is killed with it.
>
>
>
> I believe, I have studied everything I could find online, but I could not
> find the answers.
>
> I would really appreciate if you provided some 😊
>
>
>
> Is it not intended for Spark Connect to support “detached” Streaming
> Queries?
>
> Would you consider detaching StreamingQueries from the sessions that start
> them, as they are meant to run continuously?
>
> Would you consider extending control options in Spark Connect UI (start,
> stop, reset checkpoints)?
>
> It will help the users like me, who want to use Spark’s Structured
> Streaming and Connect without running additional applications just to keep
> the session alive.
>
>
>
> I will be happy to answer any question from your side or provide more
> details.
>
>
>
> Best regards,
>
> Anastasiia
>


Structured Streaming and Spark Connect

2024-09-23 Thread Anastasiia Sokhova
Dear Spark Team,

I am working with a standalone cluster, and I am using Spark Connect to submit 
my applications.
My current version is 3.5.1.

I am trying to run Structured Streaming Queries with relatively long trigger 
intervals (2 hours, 1 day).

The first issue I encountered was “Streaming query has been idle and waiting 
for new data more than 1ms”. I solved it by increasing the value in the 
internal config property  ‘spark.sql.streaming.noDataProgressEventInterval’.
Now my query is not considered idle anymore but Connect expires the session 
after ~1 hour, and the query is killed with it.

I believe, I have studied everything I could find online, but I could not find 
the answers.
I would really appreciate if you provided some 😊

Is it not intended for Spark Connect to support “detached” Streaming Queries?
Would you consider detaching StreamingQueries from the sessions that start 
them, as they are meant to run continuously?
Would you consider extending control options in Spark Connect UI (start, stop, 
reset checkpoints)?
It will help the users like me, who want to use Spark’s Structured Streaming 
and Connect without running additional applications just to keep the session 
alive.

I will be happy to answer any question from your side or provide more details.

Best regards,
Anastasiia


Compatibility Issue: Spark 3.5.2 Schema Recognition vs. Spark 3.4.0 with Hive Metastore (Case Sensitivity)

2024-09-21 Thread Mich Talebzadeh
I have seen this before on another release but cannot recall

1. *Initial Setup with Spark 3.5.2*:
   - I was using *Spark 3.5.2* recent upgrade, *Beeline 2.3.9*, and *Hadoop
3.1.1*.
   - The issue encountered was that the schema list was not visible,
specifically when querying the database. The `SHOW TABLES` or
database-related commands did not return expected results.

2. *Issue Details*:
   - The error indicated that the schema `DS` could not be found, despite
the schema existing. This suggests a case sensitivity issue or a
compatibility mismatch between Spark 3.5.2 and Hive/Hadoop setup.

3. *Solution by Downgrading Spark:*
   - *Downgraded Spark from 3.5.2 to 3.4.0*.
   - After downgrading to Spark 3.4.0, the schema names worked when forced
to lowercase (e.g., using `ds` instead of `DS`), and I was able to
successfully query the databases and schemas.

   This suggests that Spark 3.5.2 may handle case sensitivity or schema
querying differently, which was incompatible with my setup. By downgrading
to 3.4.0 and switching to lowercase schema names, the issue was resolved.

*Summary:*
- *Spark 3.5.2* caused issues with recognizing schema names, possibly due
to case sensitivity or Hive metastore compatibility.
- *Downgrading to Spark 3.4.0 *and using *lowercase schema names* resolved
the issue, allowing the database and tables to be accessed correctly.

I guess it is a good reminder that version compatibility between Spark,
Hive, and Hadoop can sometimes cause issues, particularly with metastore
interactions and case sensitivity.

HTH

Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


Spark SQL readSideCharPadding issue while reading ENUM column from mysql

2024-09-21 Thread Suyash Ajmera
I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am querying
to Mysql Database and applying

`*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working as
expected in spark 3.3.1 , but not working with 3.5.0.

Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`

The *st *column is ENUM in the database and it is causing the issue.

Below is the Physical Plan of *FILTER* phase :

For 3.3.1 :

+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
(upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))

For 3.5.0 :

+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
(upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
(upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = CLOSED)))

-----

I have debug it and found that Spark added a property in version 3.4.0 ,
i.e. **spark.sql.readSideCharPadding** which has default value **true**.

Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697

Added a new method in Class **CharVarcharCodegenUtils**

public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
int numChars = inputStr.numChars();
if (numChars == limit) {
  return inputStr;
} else if (numChars < limit) {
  return inputStr.rpad(limit, SPACE);
} else {
  return inputStr;
}
  }


**This method is appending some whitespace padding to the ENUM values while
reading and causing the Issue.**

---

When I am removing the UPPER function from the where condition the
**FILTER** Phase looks like this :

 +- Filter (((staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
 StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
) OR (staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true) = CLOSED   ))


**You can see it has added some white space after the value and the query
runs fine giving the correct result.**

But with the UPPER function I am not getting the data.

--

I have also tried to disable this Property *spark.sql.readSideCharPadding =
false* with following cases :

1. With Upper function in where clause :
   It is not pushing the filters to Database and the *query works fine*.

  +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
(upper(st#42) = CLOSED))

2. But when I am removing the upper function

 *It is pushing the filter to Mysql with the white spaces and I am not
getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*

  PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
*Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
)),EqualTo(st,CLOSED   ))]

I cannot move this filter to JDBC read query , also I can't remove this
UPPER function in the where clause.



Also I found same data getting written to CASSANDRA with *PADDING .*


[Spark Core]: Spark 3.5.0 incompatible with Embedded Kafka

2024-09-10 Thread Adesh Dsilva
Hi

I have been using Embedded Kafka in my batch job to test if my output is
correct.
It works correctly upto Spark 3.4.3 but fails at Spark 3.5.0.
I have tried different versions of Embedded Kafka as well upto the latest
version.


What might be the problem?
Is this something that needs to be fixed in Spark or Embedded Kafka?

I am using this library:
https://github.com/embeddedkafka/embedded-kafka

This is the exception I get:
io.github.embeddedkafka.KafkaUnavailableException was thrown.
io.github.embeddedkafka.KafkaUnavailableException
at
io.github.embeddedkafka.ops.ProducerOps.publishToKafka(ProducerOps.scala:219)
at
io.github.embeddedkafka.ops.ProducerOps.publishToKafka(ProducerOps.scala:82)
at
io.github.embeddedkafka.ops.ProducerOps.publishToKafka$(ProducerOps.scala:72)
at MySpec.publishToKafka(MySpec.scala:5)

Please help. Thank you

Kind regards,
Adesh


Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-09 Thread Nagatomi Yasukazu
I apologize if my previous explanation was unclear, and I realize I didn’t
provide enough context for my question.

The reason I want to submit a Spark application to a Kubernetes cluster
using the Spark Operator is that I want to use Kubernetes as the Cluster
Manager, rather than Standalone mode. This would allow the Spark Connect
Driver and Executors to run on different nodes within the Kubernetes
cluster.

I understand that it is currently possible to launch Spark Connect by
setting the Cluster Manager to Standalone. However, in that case, the
Driver and Executors run on the same node, which I believe would not scale
efficiently.

Therefore, I am considering specifying Kubernetes (specifically the
Kubernetes API) as the Cluster Manager to dynamically distribute and
schedule the Driver and multiple Executor Pods across all nodes in the
Kubernetes cluster.

With the Spark Operator, it is easy to specify Kubernetes as the Cluster
Manager, but the Spark Operator does not allow the use of the "client"
deploy mode (i.e., the deploy mode will be "cluster"). On the other hand,
Spark Connect does not support the "cluster" deploy mode, leading to a
deadlock between the two specifications.

That is why I wanted to understand the reason why Spark Connect does not
allow the "cluster" deploy mode, and this was the main point of my original
question.

2024年9月10日(火) 0:29 Prabodh Agarwal :

> Oh. This issue is pretty straightforward to solve actually. Particularly,
> in spark-3.5.2.
>
> Just download the `spark-connect` maven jar and place it in
> `$SPARK_HOME/jars`. Then rebuild the docker image. I saw that I had posted
> a comment on this Jira as well. I could fix this up for standalone cluster
> at least this way.
>
> On Mon, Sep 9, 2024 at 7:04 PM Nagatomi Yasukazu 
> wrote:
>
>> Hi Prabodh,
>>
>> Thank you for your response.
>>
>> As you can see from the following JIRA issue, it is possible to run the
>> Spark Connect Driver on Kubernetes:
>>
>> https://issues.apache.org/jira/browse/SPARK-45769
>>
>> However, this issue describes a problem that occurs when the Driver and
>> Executors are running on different nodes. This could potentially be the
>> reason why only Standalone mode is currently supported, but I am not
>> certain about it.
>>
>> Thank you for your attention.
>>
>>
>> 2024年9月9日(月) 12:40 Prabodh Agarwal :
>>
>>> My 2 cents regarding my experience with using spark connect in cluster
>>> mode.
>>>
>>> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master &
>>> other nodes as workers. Deploy spark connect pointing to the master node.
>>> This works well. The approach is not well documented, but I could figure
>>> it out by hit-and-trial.
>>> 2. In k8s, by default; we can actually get the executors to run on
>>> kubernetes itself. That is pretty straightforward, but the driver continues
>>> to run on a local machine. But yeah, I agree as well, making the driver to
>>> run on k8s itself would be slick.
>>>
>>> Thank you.
>>>
>>>
>>> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Why is it not possible to specify cluster as the deploy mode for Spark
>>>> Connect?
>>>>
>>>> As discussed in the following thread, it appears that there is an
>>>> "arbitrary decision" within spark-submit that "Cluster mode is not
>>>> applicable" to Spark Connect.
>>>>
>>>> GitHub Issue Comment:
>>>>
>>>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607
>>>>
>>>> > This will circumvent the submission error you may have gotten if you
>>>> tried to just run the SparkConnectServer directly. From my investigation,
>>>> that looks to be an arbitrary decision within spark-submit that Cluster
>>>> mode is "not applicable" to SparkConnect. Which is sort of true except when
>>>> using this operator :)
>>>>
>>>> I have reviewed the following commit and pull request, but I could not
>>>> find any discussion or reason explaining why cluster mode is not available:
>>>>
>>>> Related Commit:
>>>>
>>>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4
>>>>
>>>> Related Pull Request:
>>>> https://github.com/apache/spark/pull/39928
>>>>
>>>> This restriction poses a significant obstacle when trying to use Spark
>>>> Connect with the Spark Operator. If there is a technical reason for this, I
>>>> would like to know more about it. Additionally, if this issue is being
>>>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a
>>>> link.
>>>>
>>>> Thank you in advance.
>>>>
>>>> Best regards,
>>>> Yasukazu Nagatomi
>>>>
>>>


Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-09 Thread Prabodh Agarwal
Oh. This issue is pretty straightforward to solve actually. Particularly,
in spark-3.5.2.

Just download the `spark-connect` maven jar and place it in
`$SPARK_HOME/jars`. Then rebuild the docker image. I saw that I had posted
a comment on this Jira as well. I could fix this up for standalone cluster
at least this way.

On Mon, Sep 9, 2024 at 7:04 PM Nagatomi Yasukazu 
wrote:

> Hi Prabodh,
>
> Thank you for your response.
>
> As you can see from the following JIRA issue, it is possible to run the
> Spark Connect Driver on Kubernetes:
>
> https://issues.apache.org/jira/browse/SPARK-45769
>
> However, this issue describes a problem that occurs when the Driver and
> Executors are running on different nodes. This could potentially be the
> reason why only Standalone mode is currently supported, but I am not
> certain about it.
>
> Thank you for your attention.
>
>
> 2024年9月9日(月) 12:40 Prabodh Agarwal :
>
>> My 2 cents regarding my experience with using spark connect in cluster
>> mode.
>>
>> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master &
>> other nodes as workers. Deploy spark connect pointing to the master node.
>> This works well. The approach is not well documented, but I could figure
>> it out by hit-and-trial.
>> 2. In k8s, by default; we can actually get the executors to run on
>> kubernetes itself. That is pretty straightforward, but the driver continues
>> to run on a local machine. But yeah, I agree as well, making the driver to
>> run on k8s itself would be slick.
>>
>> Thank you.
>>
>>
>> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu 
>> wrote:
>>
>>> Hi All,
>>>
>>> Why is it not possible to specify cluster as the deploy mode for Spark
>>> Connect?
>>>
>>> As discussed in the following thread, it appears that there is an
>>> "arbitrary decision" within spark-submit that "Cluster mode is not
>>> applicable" to Spark Connect.
>>>
>>> GitHub Issue Comment:
>>>
>>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607
>>>
>>> > This will circumvent the submission error you may have gotten if you
>>> tried to just run the SparkConnectServer directly. From my investigation,
>>> that looks to be an arbitrary decision within spark-submit that Cluster
>>> mode is "not applicable" to SparkConnect. Which is sort of true except when
>>> using this operator :)
>>>
>>> I have reviewed the following commit and pull request, but I could not
>>> find any discussion or reason explaining why cluster mode is not available:
>>>
>>> Related Commit:
>>>
>>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4
>>>
>>> Related Pull Request:
>>> https://github.com/apache/spark/pull/39928
>>>
>>> This restriction poses a significant obstacle when trying to use Spark
>>> Connect with the Spark Operator. If there is a technical reason for this, I
>>> would like to know more about it. Additionally, if this issue is being
>>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a
>>> link.
>>>
>>> Thank you in advance.
>>>
>>> Best regards,
>>> Yasukazu Nagatomi
>>>
>>


Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-09 Thread Nagatomi Yasukazu
Hi Prabodh,

Thank you for your response.

As you can see from the following JIRA issue, it is possible to run the
Spark Connect Driver on Kubernetes:

https://issues.apache.org/jira/browse/SPARK-45769

However, this issue describes a problem that occurs when the Driver and
Executors are running on different nodes. This could potentially be the
reason why only Standalone mode is currently supported, but I am not
certain about it.

Thank you for your attention.


2024年9月9日(月) 12:40 Prabodh Agarwal :

> My 2 cents regarding my experience with using spark connect in cluster
> mode.
>
> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master &
> other nodes as workers. Deploy spark connect pointing to the master node.
> This works well. The approach is not well documented, but I could figure
> it out by hit-and-trial.
> 2. In k8s, by default; we can actually get the executors to run on
> kubernetes itself. That is pretty straightforward, but the driver continues
> to run on a local machine. But yeah, I agree as well, making the driver to
> run on k8s itself would be slick.
>
> Thank you.
>
>
> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu 
> wrote:
>
>> Hi All,
>>
>> Why is it not possible to specify cluster as the deploy mode for Spark
>> Connect?
>>
>> As discussed in the following thread, it appears that there is an
>> "arbitrary decision" within spark-submit that "Cluster mode is not
>> applicable" to Spark Connect.
>>
>> GitHub Issue Comment:
>>
>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607
>>
>> > This will circumvent the submission error you may have gotten if you
>> tried to just run the SparkConnectServer directly. From my investigation,
>> that looks to be an arbitrary decision within spark-submit that Cluster
>> mode is "not applicable" to SparkConnect. Which is sort of true except when
>> using this operator :)
>>
>> I have reviewed the following commit and pull request, but I could not
>> find any discussion or reason explaining why cluster mode is not available:
>>
>> Related Commit:
>>
>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4
>>
>> Related Pull Request:
>> https://github.com/apache/spark/pull/39928
>>
>> This restriction poses a significant obstacle when trying to use Spark
>> Connect with the Spark Operator. If there is a technical reason for this, I
>> would like to know more about it. Additionally, if this issue is being
>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a
>> link.
>>
>> Thank you in advance.
>>
>> Best regards,
>> Yasukazu Nagatomi
>>
>


Spark 3.2.1 vs Spark 3.5.2

2024-09-08 Thread Stephen Coy
Hi everyone,

We are migrating our ETL tasks from Spark 3.2.1 (Java 11) to Spark 3.5.2 (Java 
17).

One of these applications that works fine on 3.2 completely kills our cluster 
on 3.5.2

The clusters consist of five 256GB workers and a 256GB master.

The task is run with "--executor-memory 200G” and is completed in about 15 
minutes on 3.2.1

However, when I run with  "--executor-memory 200G” on 3.5.2, the workers all 
die eventually because the worker is unable to allocate more shared memory (as 
far as I can tell because they have to be rebooted). I then tried with  
"--executor-memory 100G”. This chugs along for about half an hour and then runs 
out of disk space (/tmp/ has about 125GB) for shared memory.

The 3.2.1 Physical Plan is 11268 lines.
The 3.5.2 Physical Plan is 12923 lines.

All the consumed data consists of parquet files that live on S3 and are 
accessed using the s3a protocol configured as:

spark.hadoop.fs.s3a.aws.credentials.provider 
org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider

# Enables the hadoop s3a committer
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a 
org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory

spark.hadoop.fs.s3a.threads.max 40
spark.hadoop.fs.s3a.connection.maximum 40

The query itself is basically:


final var catalogParts = partsSelectorA.selectParts()
.union(partsSelectorB.selectParts())
.union(partsSelectorC.selectParts())
.union(partsSelectorD.selectParts())
.distinct()
.persist();

This is followed by some further “lightweight" unions that can be ignored as I 
have tried excluding these with no effect.

Each “selectParts()” method is a select statement on a huge table (~156M rows) 
combined with a half dozen or more left joins with large (~3M rows)  tables.

I’m considering trying the 3.5.3RC which resolves some left join issues.

Any ideas?

I can share more details privately if that can help.

Regards,

Steve Coy






This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/


[CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-08 Thread Nagatomi Yasukazu
Hi All,

Why is it not possible to specify cluster as the deploy mode for Spark
Connect?

As discussed in the following thread, it appears that there is an
"arbitrary decision" within spark-submit that "Cluster mode is not
applicable" to Spark Connect.

GitHub Issue Comment:
https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607

> This will circumvent the submission error you may have gotten if you
tried to just run the SparkConnectServer directly. From my investigation,
that looks to be an arbitrary decision within spark-submit that Cluster
mode is "not applicable" to SparkConnect. Which is sort of true except when
using this operator :)

I have reviewed the following commit and pull request, but I could not find
any discussion or reason explaining why cluster mode is not available:

Related Commit:
https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4

Related Pull Request:
https://github.com/apache/spark/pull/39928

This restriction poses a significant obstacle when trying to use Spark
Connect with the Spark Operator. If there is a technical reason for this, I
would like to know more about it. Additionally, if this issue is being
tracked on JIRA or elsewhere, I would appreciate it if you could provide a
link.

Thank you in advance.

Best regards,
Yasukazu Nagatomi


Re: Spark Thrift Server - Not Scaling Down Executors 3.4.2+

2024-09-05 Thread Cheng Pan
The default value of spark.dynamicAllocation.shuffleTracking.enabled was 
changed from false to true in Spark 3.4.0, disabling it might help.

[1] 
https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-33-to-34

Thanks,
Cheng Pan



> On Sep 6, 2024, at 00:36, Jayabindu Singh  wrote:
> 
> Dear Spark Users,
> 
> We have run into an issue where with spark 3.3.2 using auto scaling with STS 
> is working fine, but with 3.4.2 or 3.5.2 executors are being left behind and 
> not scaling down. 
> Driver makes a call to remove the executor but some (not all) executors never 
> get removed.
> 
> Has anyone else noticed this or aware of any reported issues?
> 
> Any help will be greatly appreciated.
> 
> Regards
> Jay
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Thrift Server - Not Scaling Down Executors 3.4.2+

2024-09-05 Thread Jayabindu Singh
Dear Spark Users,

We have run into an issue where with spark 3.3.2 using auto scaling with
STS is working fine, but with 3.4.2 or 3.5.2 executors are being left
behind and not scaling down.
Driver makes a call to remove the executor but some (not all)
executors never get removed.

Has anyone else noticed this or aware of any reported issues?

Any help will be greatly appreciated.

Regards
Jay


Re: unable to deploy Pyspark application on GKE, Spark installed using bitnami helm chart

2024-08-27 Thread Mat Schaffer
I use https://github.com/kubeflow/spark-operator rather than bitnami chart,
but https://medium.com/@kayvan.sol2/spark-on-kubernetes-d566158186c6 shows
running spark submit from a master pod exec. Might be something to try.

On Mon, Aug 26, 2024 at 12:22 PM karan alang  wrote:

> We are currently using Dataproc on GCP for running our spark workloads,
> and i'm planning to move this workload to Kubernetes(GKE).
>
> Here is what is done so far :
>
> Installed Spark using bitnami helm chart:
>
> ```
>
> helm repo add bitnami https://charts.bitnami.com/bitnami
>
> helm install spark -f sparkConfig.yaml bitnami/spark -n spark
>
> ```
>
> Also, deployed a loadbalancer, yaml used :
>
> ```
>
> apiVersion: v1kind: Servicemetadata:
>   name: spark-master-lb
>   labels:
> app: spark
> component: LoadBalancerspec:
>   selector:
> app.kubernetes.io/component: master
> app.kubernetes.io/instance: spark
> app.kubernetes.io/name: spark
>   ports:
>   - name: webui
> port: 8080
> targetPort: 8080
>   - name: master
> port: 7077
> targetPort: 7077
>   type: LoadBalancer
>
> ```
>
> Spark is installed, and the pods have come up.
>
> When i try to do a spark-submit in cluster mode, it gives following error:
>
> ```
>
> (base) Karans-MacBook-Pro:fromEdward-jan26 karanalang$ 
> $SPARK_HOME/bin/spark-submit   --master spark://:7077   
> --deploy-mode cluster   --name spark-on-gke   
> local:///Users/karanalang/Documents/Technology/0.spark-on-gke/StructuredStream-on-gke.py24/08/26
>  12:03:26 WARN Utils: Your hostname, Karans-MacBook-Pro.local resolves to a 
> loopback address: 127.0.0.1; using 10.42.28.138 instead (on interface 
> en0)24/08/26 12:03:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
> another addressWARNING: An illegal reflective access operation has 
> occurredWARNING: Illegal reflective access by 
> org.apache.spark.unsafe.Platform 
> (file:/Users/karanalang/Documents/Technology/spark-3.1.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.3.jar)
>  to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider 
> reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: 
> Use --illegal-access=warn to enable warnings of further illegal reflective 
> access operationsWARNING: All illegal access operations will be denied in a 
> future release
> Exception in thread "main" org.apache.spark.SparkException: Cluster deploy 
> mode is currently not supported for python applications on standalone 
> clusters.
> at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
> at 
> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:273)
> at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> ```
>
> In client mode, it gives the following error :
>
> 24/08/26 12:06:58 ERROR SparkContext: Error initializing SparkContext.
> java.lang.NullPointerException
> at org.apache.spark.SparkContext.(SparkContext.scala:640)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
> at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
> at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:238)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:829)24/08/26 12:06:58 INFO 
> SparkContext: SparkContext already stopped.
>
> Couple of questions :
>
>1.
>
>is using the helm chart the correct 

unable to deploy Pyspark application on GKE, Spark installed using bitnami helm chart

2024-08-26 Thread karan alang
We are currently using Dataproc on GCP for running our spark workloads, and
i'm planning to move this workload to Kubernetes(GKE).

Here is what is done so far :

Installed Spark using bitnami helm chart:

```

helm repo add bitnami https://charts.bitnami.com/bitnami

helm install spark -f sparkConfig.yaml bitnami/spark -n spark

```

Also, deployed a loadbalancer, yaml used :

```

apiVersion: v1kind: Servicemetadata:
  name: spark-master-lb
  labels:
app: spark
component: LoadBalancerspec:
  selector:
app.kubernetes.io/component: master
app.kubernetes.io/instance: spark
app.kubernetes.io/name: spark
  ports:
  - name: webui
port: 8080
targetPort: 8080
  - name: master
port: 7077
targetPort: 7077
  type: LoadBalancer

```

Spark is installed, and the pods have come up.

When i try to do a spark-submit in cluster mode, it gives following error:

```

(base) Karans-MacBook-Pro:fromEdward-jan26 karanalang$
$SPARK_HOME/bin/spark-submit   --master spark://:7077
--deploy-mode cluster   --name spark-on-gke
local:///Users/karanalang/Documents/Technology/0.spark-on-gke/StructuredStream-on-gke.py24/08/26
12:03:26 WARN Utils: Your hostname, Karans-MacBook-Pro.local resolves
to a loopback address: 127.0.0.1; using 10.42.28.138 instead (on
interface en0)24/08/26 12:03:26 WARN Utils: Set SPARK_LOCAL_IP if you
need to bind to another addressWARNING: An illegal reflective access
operation has occurredWARNING: Illegal reflective access by
org.apache.spark.unsafe.Platform
(file:/Users/karanalang/Documents/Technology/spark-3.1.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.3.jar)
to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please
consider reporting this to the maintainers of
org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to
enable warnings of further illegal reflective access
operationsWARNING: All illegal access operations will be denied in a
future release
Exception in thread "main" org.apache.spark.SparkException: Cluster
deploy mode is currently not supported for python applications on
standalone clusters.
at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
at 
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:273)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

```

In client mode, it gives the following error :

24/08/26 12:06:58 ERROR SparkContext: Error initializing SparkContext.
java.lang.NullPointerException
at org.apache.spark.SparkContext.(SparkContext.scala:640)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)24/08/26
12:06:58 INFO SparkContext: SparkContext already stopped.

Couple of questions :

   1.

   is using the helm chart the correct way to install Apache Spark on
   GKE/k8s (Note - need to install on both GKE and On-prem kubernetes)
   2.

   How to submit pyspark jobs on Spark cluster deployed on GKE (eg. Do I
   need to create a K8s deployment for each spark job ?)

tia !

Here is the stackoverflow link :

https://stackoverflow.com/questions/78915988/unable-to-deploy-pyspark-application-on-gke-spark-installed-using-bitnami-helm


Job Opportunities in India or UK with Tier 2 Sponsorship - Spark Expert

2024-08-26 Thread sri hari kali charan Tummala
Hi Spark Community,

I'm a seasoned Data Engineering professional with 13+ years of experience
and expertise in Apache Spark, particularly in Structured Streaming. I'm
looking for job opportunities in India or the UK that offer Tier 2
sponsorship. If anyone knows of openings or can connect me with potential
employers, please reach out.

Thanks,
Sri Tummala


Re: Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-24 Thread Prem Sahoo
Issue resolved , thanks for your time folks.Sent from my iPhoneOn Aug 21, 2024, at 5:38 PM, Prem Sahoo  wrote:Hello Team,Could you please check on this request ?On Mon, Aug 19, 2024 at 7:00 PM Prem Sahoo <prem.re...@gmail.com> wrote:Hello Spark and User,could you please shed some light ?On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo <prem.re...@gmail.com> wrote:Hello Spark and User,we have a  Spark project which is a long running Spark session where  it does below1. We are reading from  Mapr FS and writing to MapR FS.2. Another parallel job which reads from MapR Fs and Writes to MinIO object storage.We are finding issues for a few batches of Spark jobs which one writes to MinIO , reads empty data frame/dataset from MapR but the job which reads from  & writes to  MapR Fs for the same batches never had any issue.I was just going through some blogs and stackoverflow to know that Spark Session which holds both information /config of MapR and Minio sometimes find this issue as Spark Session or context has no correct  information so either we need to clear or restart spark session for each batch.Please let me know if you have any suggestions to get rid of this issue.




Expected Release date for Spark 4?

2024-08-23 Thread Vikash Lavaniya
Hi,

We've been working with Spark 4 preview for a while and would like to know 
about the release date of Spark 4 release.

Will you be able to provide a planned release date for Spark 4 to helps us plan 
better?

Regards,
Vikash


Re: Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-21 Thread Prem Sahoo
Hello Team,
Could you please check on this request ?

On Mon, Aug 19, 2024 at 7:00 PM Prem Sahoo  wrote:

> Hello Spark and User,
> could you please shed some light ?
>
> On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo  wrote:
>
>> Hello Spark and User,
>> we have a  Spark project which is a long running Spark session where  it
>> does below
>> 1. We are reading from  Mapr FS and writing to MapR FS.
>> 2. Another parallel job which reads from MapR Fs and Writes to MinIO
>> object storage.
>>
>> We are finding issues for a few batches of Spark jobs which one writes to
>> MinIO , reads empty data frame/dataset from MapR but the job which reads
>> from  & writes to  MapR Fs for the same batches never had any issue.
>>
>> I was just going through some blogs and stackoverflow to know that Spark
>> Session which holds both information /config of MapR and Minio sometimes
>> find this issue as Spark Session or context has no correct  information so
>> either we need to clear or restart spark session for each batch.
>>
>>
>> Please let me know if you have any suggestions to get rid of this issue.
>>
>>


Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Cheng Pan
I searched [1] using the keywords “reliable” and got nothing, so I cannot draw 
the same conclusion as you.

If an implementation claims to support reliable storage, it should inherit 
interface ShuffleDriverComponents and override method supportsReliableStorage 
[2] to return true, for example, Apache Celeborn [3], a Remote Shuffle Service 
for Spark.

Thanks,
Cheng Pan

[1] 
https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage
[2] 
https://github.com/apache/spark/blob/v3.5.2/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java#L65-L72
[3] 
https://github.com/apache/celeborn/blob/v0.5.1/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java#L56

> On Aug 20, 2024, at 18:42, Aaron Grubb  wrote:
> 
> Hi Cheng,
> 
> Due to the documentation [1]. This is why I suggested at the end of the 
> message you replied to that documentation should be updated or
> clarified. Can you explain how persistent volume claims in Kubernetes are 
> "unreliable" storage?
> 
> Thanks,
> Aaron
> 
> https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage
> 
> On Tue, 2024-08-20 at 18:37 +0800, Cheng Pan wrote:
>> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support 
>> reliable storage, so the condition 4) is false even with this
>> configuration.
>> I’m not sure why you think it does.
>> 
>> Thanks,
>> Cheng Pan
>> 
>>> On Aug 20, 2024, at 18:27, Aaron Grubb  wrote:
>>> 
>>> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the 
>>> job however it still was not stable in the face of spot
>>> instances
>>> going away. Adding spark.decommission.enabled=true, 
>>> spark.storage.decommission.enabled=true and
>>> spark.executor.decommission.killInterval=110
>>> appears to have completely stabilized the job (not sure which did the trick 
>>> as I added them at the same time). Perhaps extra
>>> documentation or
>>> clarifications should be added as it doesn't seem clear to me how to 
>>> arrivate at job stability using dynamic allocation without trial and
>>> error.
>>> 
>>> On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote:
>>>> Hi all,
>>>> 
>>>> I'm running Spark on Kubernetes on AWS using only spot instances for 
>>>> executors with dynamic allocation enabled. This particular job is
>>>> being
>>>> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I 
>>>> had recently switched to using PersistentVolumeClaims in
>>>> Spark
>>>> with
>>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>>>  but kept
>>>> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
>>>> under the notes for spark.dynamicAllocation.enabled [2] that
>>>> these
>>>> configurations are "or" not "and". However, when setting 
>>>> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with 
>>>> the
>>>> message
>>>> 
>>>> org.apache.spark.SparkException: Dynamic allocation of executors requires 
>>>> one of the following conditions: 1) enabling external shuffle
>>>> service through spark.shuffle.service.enabled. 2) enabling shuffle 
>>>> tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3)
>>>> enabling shuffle blocks decommission through spark.decommission.enabled 
>>>> and spark.storage.decommission.shuffleBlocks.enabled. 4)
>>>> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a 
>>>> custom ShuffleDataIO who's ShuffleDriverComponents supports
>>>> reliable
>>>> storage.
>>>> 
>>>> Am I hitting this bug unavoidably? Or is there a configuration I'm missing 
>>>> to enable
>>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>>>  to replace
>>>> spark.dynamicAllocation.shuffleTracking.enabled=true?
>>>> 
>>>> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>>>> 
>>>> spark.checkpoint.compress  
>>>> true
>>>> spark.driver.cores 
>>>>   1
&

Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Cheng Pan
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support 
reliable storage, so the condition 4) is false even with this configuration.
I’m not sure why you think it does.

Thanks,
Cheng Pan

> On Aug 20, 2024, at 18:27, Aaron Grubb  wrote:
> 
> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job 
> however it still was not stable in the face of spot instances
> going away. Adding spark.decommission.enabled=true, 
> spark.storage.decommission.enabled=true and 
> spark.executor.decommission.killInterval=110
> appears to have completely stabilized the job (not sure which did the trick 
> as I added them at the same time). Perhaps extra documentation or
> clarifications should be added as it doesn't seem clear to me how to arrivate 
> at job stability using dynamic allocation without trial and
> error.
> 
> On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote:
>> Hi all,
>> 
>> I'm running Spark on Kubernetes on AWS using only spot instances for 
>> executors with dynamic allocation enabled. This particular job is
>> being
>> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I 
>> had recently switched to using PersistentVolumeClaims in Spark
>> with
>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>  but kept
>> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
>> under the notes for spark.dynamicAllocation.enabled [2] that these
>> configurations are "or" not "and". However, when setting 
>> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with 
>> the
>> message
>> 
>> org.apache.spark.SparkException: Dynamic allocation of executors requires 
>> one of the following conditions: 1) enabling external shuffle
>> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
>> through spark.dynamicAllocation.shuffleTracking.enabled. 3)
>> enabling shuffle blocks decommission through spark.decommission.enabled and 
>> spark.storage.decommission.shuffleBlocks.enabled. 4)
>> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a 
>> custom ShuffleDataIO who's ShuffleDriverComponents supports reliable
>> storage.
>> 
>> Am I hitting this bug unavoidably? Or is there a configuration I'm missing 
>> to enable
>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>  to replace
>> spark.dynamicAllocation.shuffleTracking.enabled=true?
>> 
>> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>> 
>> spark.checkpoint.compress
>>   true
>> spark.driver.cores   
>> 1
>> spark.driver.maxResultSize   
>>   2g
>> spark.driver.memory  
>>   5140m
>> spark.dynamicAllocation.enabled  
>>   true
>> spark.dynamicAllocation.executorAllocationRatio  
>>   0.33
>> spark.dynamicAllocation.maxExecutors 
>>   20
>> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 
>>   30
>> spark.eventLog.enabled   
>>   true
>> spark.executor.cores 
>> 3
>> spark.executor.logs.rolling.enableCompression
>>   true
>> spark.executor.logs.rolling.maxRetainedFiles 
>>   48
>> spark.executor.logs.rolling.strategy 
>>   time
>> spark.executor.logs.rolling.time.interval
>>   hourly
>> spark.hadoop.fs.s3a.impl 
>>   org.apache.hadoop.fs.s3a.S3AFileSystem
>> spark.hadoop.fs.s3a.connection.ssl.enabled   
>>   false
>> spark.hadoop.fs.s3a.fast.upload  
>>   true
>> spark.kryo.registratio

Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Aaron Grubb
Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job 
however it still was not stable in the face of spot instances
going away. Adding spark.decommission.enabled=true, 
spark.storage.decommission.enabled=true and 
spark.executor.decommission.killInterval=110
appears to have completely stabilized the job (not sure which did the trick as 
I added them at the same time). Perhaps extra documentation or
clarifications should be added as it doesn't seem clear to me how to arrivate 
at job stability using dynamic allocation without trial and
error.

On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote:
> Hi all,
>
> I'm running Spark on Kubernetes on AWS using only spot instances for 
> executors with dynamic allocation enabled. This particular job is
> being
> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had 
> recently switched to using PersistentVolumeClaims in Spark
> with
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  but kept
> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
> under the notes for spark.dynamicAllocation.enabled [2] that these
> configurations are "or" not "and". However, when setting 
> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the
> message
>
> org.apache.spark.SparkException: Dynamic allocation of executors requires one 
> of the following conditions: 1) enabling external shuffle
> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
> through spark.dynamicAllocation.shuffleTracking.enabled. 3)
> enabling shuffle blocks decommission through spark.decommission.enabled and 
> spark.storage.decommission.shuffleBlocks.enabled. 4)
> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom 
> ShuffleDataIO who's ShuffleDriverComponents supports reliable
> storage.
>
> Am I hitting this bug unavoidably? Or is there a configuration I'm missing to 
> enable
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  to replace
> spark.dynamicAllocation.shuffleTracking.enabled=true?
>
> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>
> spark.checkpoint.compress 
>  true
> spark.driver.cores
>1
> spark.driver.maxResultSize
>  2g
> spark.driver.memory   
>  5140m
> spark.dynamicAllocation.enabled   
>  true
> spark.dynamicAllocation.executorAllocationRatio   
>  0.33
> spark.dynamicAllocation.maxExecutors  
>  20
> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout  
>  30
> spark.eventLog.enabled
>  true
> spark.executor.cores  
>3
> spark.executor.logs.rolling.enableCompression 
>  true
> spark.executor.logs.rolling.maxRetainedFiles  
>  48
> spark.executor.logs.rolling.strategy  
>  time
> spark.executor.logs.rolling.time.interval 
>  hourly
> spark.hadoop.fs.s3a.impl  
>  org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.connection.ssl.enabled
>  false
> spark.hadoop.fs.s3a.fast.upload   
>  true
> spark.kryo.registrationRequired   
>  false
> spark.kryo.unsafe 
>  false
> spark.kryoserializer.buffer   
>  1m
> spark.kryoserializer.buffer.max   
>  1g
> spark.kubernetes.driver.limit.cores   
>  750m
> spark.kubernetes.driver.ownPersistentVolumeClaim  
>  

Re: Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-19 Thread Prem Sahoo
Hello Spark and User,
could you please shed some light ?

On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo  wrote:

> Hello Spark and User,
> we have a  Spark project which is a long running Spark session where  it
> does below
> 1. We are reading from  Mapr FS and writing to MapR FS.
> 2. Another parallel job which reads from MapR Fs and Writes to MinIO
> object storage.
>
> We are finding issues for a few batches of Spark jobs which one writes to
> MinIO , reads empty data frame/dataset from MapR but the job which reads
> from  & writes to  MapR Fs for the same batches never had any issue.
>
> I was just going through some blogs and stackoverflow to know that Spark
> Session which holds both information /config of MapR and Minio sometimes
> find this issue as Spark Session or context has no correct  information so
> either we need to clear or restart spark session for each batch.
>
>
> Please let me know if you have any suggestions to get rid of this issue.
>
>


Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-19 Thread Aaron Grubb
Hi all,

I'm running Spark on Kubernetes on AWS using only spot instances for executors 
with dynamic allocation enabled. This particular job is being
triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had 
recently switched to using PersistentVolumeClaims in Spark with
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
 but kept
spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see under 
the notes for spark.dynamicAllocation.enabled [2] that these
configurations are "or" not "and". However, when setting 
spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the
message

org.apache.spark.SparkException: Dynamic allocation of executors requires one 
of the following conditions: 1) enabling external shuffle
service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
through spark.dynamicAllocation.shuffleTracking.enabled. 3)
enabling shuffle blocks decommission through spark.decommission.enabled and 
spark.storage.decommission.shuffleBlocks.enabled. 4)
(Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom 
ShuffleDataIO who's ShuffleDriverComponents supports reliable
storage.

Am I hitting this bug unavoidably? Or is there a configuration I'm missing to 
enable
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
 to replace
spark.dynamicAllocation.shuffleTracking.enabled=true?

Using Spark 3.5.1 - here's my full spark-defaults.conf just in case

spark.checkpoint.compress   
   true
spark.driver.cores  
   1
spark.driver.maxResultSize  
   2g
spark.driver.memory 
   5140m
spark.dynamicAllocation.enabled 
   true
spark.dynamicAllocation.executorAllocationRatio 
   0.33
spark.dynamicAllocation.maxExecutors
   20
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
   30
spark.eventLog.enabled  
   true
spark.executor.cores
   3
spark.executor.logs.rolling.enableCompression   
   true
spark.executor.logs.rolling.maxRetainedFiles
   48
spark.executor.logs.rolling.strategy
   time
spark.executor.logs.rolling.time.interval   
   hourly
spark.hadoop.fs.s3a.impl
   org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.connection.ssl.enabled  
   false
spark.hadoop.fs.s3a.fast.upload 
   true
spark.kryo.registrationRequired 
   false
spark.kryo.unsafe   
   false
spark.kryoserializer.buffer 
   1m
spark.kryoserializer.buffer.max 
   1g
spark.kubernetes.driver.limit.cores 
   750m
spark.kubernetes.driver.ownPersistentVolumeClaim
   true
spark.kubernetes.driver.request.cores   
   750m
spark.kubernetes.driver.reusePersistentVolumeClaim  
   true
spark.kubernetes.driver.waitToReusePersistentVolumeClaim
   true
spark.kubernetes.executor.limit.cores   
   3700m
spark.kubernetes.executor.request.cores 
   3700m
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName
OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path
   /data/spark-x/executor-x
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly
   false
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit
20Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-

Spark Job Fails while writing data to a S3 location in Parquet

2024-08-18 Thread Nipuna Shantha
I am trying to read .csv.gz file in S3 and write it to S3 in Parquet format
through a Spark job. Currently I am using AWS EMR service for this. Spark
Job is execute as a step in EMR cluster. For some .csv.gz files I have
encounter below issue. I have used both spark 3.4 and 3.5 versions and
still getting same error.

> > Exception in thread "main" java.lang.Exception:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 63
> in stage 4.0 failed 4 times, most recent failure: Lost task 63.3 in stage
> 4.0 (TID 71) (ip-100-68-72-50.880639474967.ase1.aws.dev.r53 executor 4):
> org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while
> writing rows to s3:///TEMP/.
> > at
> org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
> > at
> org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
> > at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
> > at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
> > at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
> > at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> > at org.apache.spark.scheduler.Task.run(Task.scala:141)
> > at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> > at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> > at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> > at java.base/java.lang.Thread.run(Thread.java:840)
> > Caused by: java.io.EOFException: Unexpected end of input stream
> > at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
> > at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> > at
> nl.basjes.hadoop.io.compress.SplittableGzipCodec$SplittableGzipInputStream.read(SplittableGzipCodec.java:468)
> > at java.base/java.io.InputStream.read(InputStream.java:218)
> > at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:132)
> > at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
> > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
> > at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:163)
> > at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:200)
> > at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> > at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:239)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:404)
> > at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411)
> > ... 15 more

Driver stacktrace:
> at
> com.lseg.lsa.pipeline.core.MIngestionUtilities$.validateInitialDataframe(MIngestionUtilities.scala:205)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation(DataIngestion.scala:22)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation$(DataIngestion.scala:21)
> at
&g

Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-15 Thread Prem Sahoo
Hello Spark and User,
we have a  Spark project which is a long running Spark session where  it
does below
1. We are reading from  Mapr FS and writing to MapR FS.
2. Another parallel job which reads from MapR Fs and Writes to MinIO object
storage.

We are finding issues for a few batches of Spark jobs which one writes to
MinIO , reads empty data frame/dataset from MapR but the job which reads
from  & writes to  MapR Fs for the same batches never had any issue.

I was just going through some blogs and stackoverflow to know that Spark
Session which holds both information /config of MapR and Minio sometimes
find this issue as Spark Session or context has no correct  information so
either we need to clear or restart spark session for each batch.


Please let me know if you have any suggestions to get rid of this issue.


[Spark Connect ] Date Data type formatting issue

2024-08-14 Thread Ilango
Hi all,

I am experiencing a date field formatting issue when loading data from an
Hive table in Spark via Spark Connect (On AWS EMR cluster) using R sparklyr
package. The date field is converted to a char type, where as the same
field is loaded as a date type when using our On-Premise Spark with YARN
connected to the same On-Premise Hive table.

Could you please advise on how to ensure that the date fields are loaded in
their native date type format when using Spark Connect?

Thanks,
Elango


Re: [ANNOUNCE] Apache Spark 3.5.2 released

2024-08-12 Thread Xiao Li
Thank you, Kent!

Kent Yao  于2024年8月12日周一 08:03写道:

> We are happy to announce the availability of Apache Spark 3.5.2!
>
> Spark 3.5.2 is the second maintenance release containing security
> and correctness fixes. This release is based on the branch-3.5
> maintenance branch of Spark. We strongly recommend all 3.5 users
> to upgrade to this stable release.
>
> To download Spark 3.5.2, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-5-2.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
> Kent Yao
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[ANNOUNCE] Apache Spark 3.5.2 released

2024-08-12 Thread Kent Yao
We are happy to announce the availability of Apache Spark 3.5.2!

Spark 3.5.2 is the second maintenance release containing security
and correctness fixes. This release is based on the branch-3.5
maintenance branch of Spark. We strongly recommend all 3.5 users
to upgrade to this stable release.

To download Spark 3.5.2, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-5-2.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Kent Yao

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Prabodh Agarwal
Glad to help!

On Tue, 6 Aug, 2024, 17:37 Ilango,  wrote:

>
> Thanks Praboth. Passing —master attr in spark connect command worked like
> charm.  I am able to submit spark connect to my existing stand-alone cluster
>
> Thanks for saving my day once again :)
>
> Thanks,
> Elango
>
>
> On Tue, 6 Aug 2024 at 6:08 PM, Prabodh Agarwal 
> wrote:
>
>> Do you get some error on passing the master option to your spark connect
>> command?
>>
>> On Tue, 6 Aug, 2024, 15:36 Ilango,  wrote:
>>
>>>
>>>
>>>
>>> Thanks Prabodh. I'm having an issue with the Spark Connect connection as
>>> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
>>> the actual master node for our Spark standalone cluster is different. I am
>>> passing that master node ip in the Spark Connect Connection. But still it
>>> is not set correctly. Could you please help me update this configuration to
>>> reflect the correct master node value?
>>>
>>>
>>>
>>> This is my spark connect connection
>>>
>>>
>>>
>>> spark = SparkSession.builder\
>>>
>>> .remote("sc://:15002")\
>>>
>>> .getOrCreate()
>>>
>>>
>>> Thanks,
>>> Elango
>>>
>>>
>>> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
>>> wrote:
>>>
>>>> There is an executors tab on spark connect. It's contents are generally
>>>> similar to the workers section of the spark master ui.
>>>>
>>>> You might need to specify --master option in your spark connect command
>>>> if you haven't done so yet.
>>>>
>>>> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>>>>
>>>>>
>>>>> Hi all,
>>>>>
>>>>> I am evaluating the use of Spark Connect with my Spark stand-alone
>>>>> cluster, which has a master node and 3 worker nodes. I have successfully
>>>>> created a Spark Connect connection. However, when submitting Spark SQL
>>>>> queries, the jobs are being executed only on the master node, and I do not
>>>>> observe any executors running on the worker nodes, despite requesting 4
>>>>> executors.
>>>>>
>>>>>
>>>>>
>>>>> I would appreciate clarification on whether Spark stand-alone cluster
>>>>> is supported for use with Spark Connect.
>>>>>
>>>>> If so, how can I leverage the existing Spark stand-alone cluster's
>>>>> worker nodes?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Elango
>>>>>
>>>>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Ilango
Thanks Praboth. Passing —master attr in spark connect command worked like
charm.  I am able to submit spark connect to my existing stand-alone cluster

Thanks for saving my day once again :)

Thanks,
Elango


On Tue, 6 Aug 2024 at 6:08 PM, Prabodh Agarwal 
wrote:

> Do you get some error on passing the master option to your spark connect
> command?
>
> On Tue, 6 Aug, 2024, 15:36 Ilango,  wrote:
>
>>
>>
>>
>> Thanks Prabodh. I'm having an issue with the Spark Connect connection as
>> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
>> the actual master node for our Spark standalone cluster is different. I am
>> passing that master node ip in the Spark Connect Connection. But still it
>> is not set correctly. Could you please help me update this configuration to
>> reflect the correct master node value?
>>
>>
>>
>> This is my spark connect connection
>>
>>
>>
>> spark = SparkSession.builder\
>>
>> .remote("sc://:15002")\
>>
>>     .getOrCreate()
>>
>>
>> Thanks,
>> Elango
>>
>>
>> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
>> wrote:
>>
>>> There is an executors tab on spark connect. It's contents are generally
>>> similar to the workers section of the spark master ui.
>>>
>>> You might need to specify --master option in your spark connect command
>>> if you haven't done so yet.
>>>
>>> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>>>
>>>>
>>>> Hi all,
>>>>
>>>> I am evaluating the use of Spark Connect with my Spark stand-alone
>>>> cluster, which has a master node and 3 worker nodes. I have successfully
>>>> created a Spark Connect connection. However, when submitting Spark SQL
>>>> queries, the jobs are being executed only on the master node, and I do not
>>>> observe any executors running on the worker nodes, despite requesting 4
>>>> executors.
>>>>
>>>>
>>>>
>>>> I would appreciate clarification on whether Spark stand-alone cluster
>>>> is supported for use with Spark Connect.
>>>>
>>>> If so, how can I leverage the existing Spark stand-alone cluster's
>>>> worker nodes?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Elango
>>>>
>>>


Re: Spark 3.5.0 bug - Writing a small paraquet dataframe to storage using spark 3.5.0 taking too long

2024-08-06 Thread Bijoy Deb
Hi Spark community,

Any resolution would be highly appreciated.

Few additional analysis from my side:

The lag in writing parquet exists in spark 3.5.0, but no lag in spark 3.1.2
or 2.4.5.

Also, I found out that the task WholeStageCodeGen(1) --> ColumnarToRow is
the one which is taking the most time (almost 3 mins for a simple 3 mb
file) in spark 3.5.0. Input batch size of this stage is 10,and output
record count is 30,000. The same CoumnarToRow task in spark 3.1.2
finishes in 10 secs.
Further, with spark 3.5.0 if I cache the dataframe and materialise it using
df.count() and then write the df into parquet file, then the ColumnarToRow
gets called twice, first takes 10 secs and second one 3 mins.

On Wed, 31 Jul, 2024, 10:14 PM Bijoy Deb,  wrote:

> Hi,
>
> We are using Spark on-premise to simply read a parquet file from
> GCS(Google Cloud storage) into the DataFrame and write the DataFrame into
> another folder in parquet format in GCS, using below code:
>
> 
>
> DFS_BLOCKSIZE = 512 * 1024 * 1024
>
>
> spark = SparkSession.builder \
> .appName("test_app_parquet_load") \
>  .config("spark.master", "spark://spark-master-svc:7077") \
> .config("spark.driver.maxResultSize", '1g') \
> .config("spark.driver.memory", '1g') \
>  .config("spark.executor.cores",4) \
> .config("spark.sql.shuffle.partitions", 16) \
>.config("spark.sql.files.maxPartitionBytes", DFS_BLOCKSIZE) \
>
>  .getOrCreate()
>
>
> folder="gs://input_folder/input1/key=20240610"
> print(f"reading parquet from {folder}")
>
> start_time1 = time.time()
>
> data_df = spark.read.parquet(folder)
>
> end_time1 = time.time()
> print(f"Time duration for reading parquet t1: {end_time1 - start_time1}")
>
>
> start_time2 = time.time()
>
> data_df.write.mode("overwrite").parquet("gs://output_folder/output/key=20240610")
>
> end_time2 = time.time()
> print(f"Time duration for writing parquet t3: {end_time2 - start_time2}")
>
> spark.stop()
>
>
>
>
>
>
>
>
> ______
>
>
> However, we observed a drastic time difference between Spark 2.4.5 and
> 3.5.0 in the writing process.Even in case of local filesystem instead of
> gcs, spark 3.5.0 is taking long time.
>
> In Spark 2.4.5, the above code takes about 10 seconds for Parquet to read
> and 20 seconds for write, while in Spark 3.5.0 read takes almost similar
> time but write takes nearly 3 minutes. The size of the file is just 3 MB.
> Further, we have noticed that if we read a CSV file instead of parquet into
> DataFrame and write to another folder in parquet format, Spark 3.5.0 takes
> relatively less time to write, about 30-40 seconds.
>
> So, it looks like only reading a parquet file to a dataframe and writing
> that dataframe to another parquet file is taking too long in the case of
> Spark 3.5.0.
>
> We are seeing that there is no slowness even with Spark 3.1.2. So, it
> seems that the issue with spark job taking too long to write a parquet
> based dataframe into another parquet file (in gcs or local filesystem both)
> is specific to spark 3.5.0. Looks to be either a potential bug in Spark
> 3.5.0 or some parquet related configuration that is not clearly documented.
> Any help in this regard would be highly appreciated.
>
>
> Thanks,
>
> Bijoy
>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Prabodh Agarwal
Do you get some error on passing the master option to your spark connect
command?

On Tue, 6 Aug, 2024, 15:36 Ilango,  wrote:

>
>
>
> Thanks Prabodh. I'm having an issue with the Spark Connect connection as
> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
> the actual master node for our Spark standalone cluster is different. I am
> passing that master node ip in the Spark Connect Connection. But still it
> is not set correctly. Could you please help me update this configuration to
> reflect the correct master node value?
>
>
>
> This is my spark connect connection
>
>
>
> spark = SparkSession.builder\
>
> .remote("sc://:15002")\
>
> .getOrCreate()
>
>
> Thanks,
> Elango
>
>
> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
> wrote:
>
>> There is an executors tab on spark connect. It's contents are generally
>> similar to the workers section of the spark master ui.
>>
>> You might need to specify --master option in your spark connect command
>> if you haven't done so yet.
>>
>> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>>
>>>
>>> Hi all,
>>>
>>> I am evaluating the use of Spark Connect with my Spark stand-alone
>>> cluster, which has a master node and 3 worker nodes. I have successfully
>>> created a Spark Connect connection. However, when submitting Spark SQL
>>> queries, the jobs are being executed only on the master node, and I do not
>>> observe any executors running on the worker nodes, despite requesting 4
>>> executors.
>>>
>>>
>>>
>>> I would appreciate clarification on whether Spark stand-alone cluster is
>>> supported for use with Spark Connect.
>>>
>>> If so, how can I leverage the existing Spark stand-alone cluster's
>>> worker nodes?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Elango
>>>
>>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Ilango
Thanks Prabodh. I'm having an issue with the Spark Connect connection as
the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
the actual master node for our Spark standalone cluster is different. I am
passing that master node ip in the Spark Connect Connection. But still it
is not set correctly. Could you please help me update this configuration to
reflect the correct master node value?



This is my spark connect connection



spark = SparkSession.builder\

.remote("sc://:15002")\

.getOrCreate()


Thanks,
Elango


On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
wrote:

> There is an executors tab on spark connect. It's contents are generally
> similar to the workers section of the spark master ui.
>
> You might need to specify --master option in your spark connect command if
> you haven't done so yet.
>
> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>
>>
>> Hi all,
>>
>> I am evaluating the use of Spark Connect with my Spark stand-alone
>> cluster, which has a master node and 3 worker nodes. I have successfully
>> created a Spark Connect connection. However, when submitting Spark SQL
>> queries, the jobs are being executed only on the master node, and I do not
>> observe any executors running on the worker nodes, despite requesting 4
>> executors.
>>
>>
>>
>> I would appreciate clarification on whether Spark stand-alone cluster is
>> supported for use with Spark Connect.
>>
>> If so, how can I leverage the existing Spark stand-alone cluster's worker
>> nodes?
>>
>>
>>
>>
>>
>>
>> Thanks,
>> Elango
>>
>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Prabodh Agarwal
There is an executors tab on spark connect. It's contents are generally
similar to the workers section of the spark master ui.

You might need to specify --master option in your spark connect command if
you haven't done so yet.

On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:

>
> Hi all,
>
> I am evaluating the use of Spark Connect with my Spark stand-alone
> cluster, which has a master node and 3 worker nodes. I have successfully
> created a Spark Connect connection. However, when submitting Spark SQL
> queries, the jobs are being executed only on the master node, and I do not
> observe any executors running on the worker nodes, despite requesting 4
> executors.
>
>
>
> I would appreciate clarification on whether Spark stand-alone cluster is
> supported for use with Spark Connect.
>
> If so, how can I leverage the existing Spark stand-alone cluster's worker
> nodes?
>
>
>
>
>
>
> Thanks,
> Elango
>


[spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Ilango
Hi all,

I am evaluating the use of Spark Connect with my Spark stand-alone cluster,
which has a master node and 3 worker nodes. I have successfully created a
Spark Connect connection. However, when submitting Spark SQL queries, the
jobs are being executed only on the master node, and I do not observe any
executors running on the worker nodes, despite requesting 4 executors.



I would appreciate clarification on whether Spark stand-alone cluster is
supported for use with Spark Connect.

If so, how can I leverage the existing Spark stand-alone cluster's worker
nodes?






Thanks,
Elango


Spark History CORS header ‘Access-Control-Allow-Origin’ missing

2024-08-06 Thread Thomas Mauran
Hello, 
I'm trying to acces Spark History UI through Apache Knox proxy but I get this 
error the following error: 

Cross-Origin Request Blocked: The Same Origin Policy disallows reading the 
remote resource at [ 
https://ithdpdev-ekleszcz01.cern.ch:18080/api/v1/applications?limit=2147483647&status=completed
 | 
https://:18080/api/v1/applications?limit=2147483647&status=completed
 ] . (Reason: CORS header ‘Access-Control-Allow-Origin’ missing). Status code: 
302. 

I can' t find something related to CORS in Spark do you know how I could fix 
that ? 
Someone suggested the following parameters but it didn't work and I can't 
manage to find any reference to those online: 



spark.hadoop.http.cross-origin.allowed-origins = * 

spark.hadoop.http.cross-origin.allowed-methods = GET, PUT, POST, OPTIONS, HEAD, 
DELETE 

spark.hadoop.http.cross-origin.allowed-headers = X-Requested-With, 
Content-Type, Accept, Origin, WWW-Authenticate, Accept-Encoding, 
Transfer-Encoding 

spark.hadoop.http.cross-origin.max-age = 180 
Thanks for your answer 


Re: [Issue] Spark SQL - broadcast failure

2024-08-01 Thread Sudharshan V
Hi all,
Do we have any idea on this.

Thanks

On Tue, 23 Jul, 2024, 12:54 pm Sudharshan V, 
wrote:

> We removed the explicit broadcast for that particular table and it took
> longer time since the join type changed from BHJ to SMJ.
>
> I wanted to understand how I can find what went wrong with the broadcast
> now.
> How do I know the size of the table inside of spark memory.
>
> I have tried to cache the table hoping I could see the table size in the
> storage tab of spark UI of EMR.
>
> But I see no data there .
>
> Thanks
>
> On Tue, 23 Jul, 2024, 12:48 pm Sudharshan V, 
> wrote:
>
>> Hi all, apologies for the delayed response.
>>
>> We are using spark version 3.4.1 in jar and EMR 6.11 runtime.
>>
>> We have disabled the auto broadcast always and would broadcast the
>> smaller tables using explicit broadcast.
>>
>> It was working fine historically and only now it is failing.
>>
>> The data sizes I mentioned was taken from S3.
>>
>> Thanks,
>> Sudharshan
>>
>> On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, 
>> wrote:
>>
>>> Can you try disabling broadcast join and see what happens?
>>>
>>> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Been facing a weird issue lately.
>>>> In our production code base , we have an explicit broadcast for a small
>>>> table.
>>>> It is just a look up table that is around 1gb in size in s3 and just
>>>> had few million records and 5 columns.
>>>>
>>>> The ETL was running fine , but with no change from the codebase nor the
>>>> infrastructure, we are getting broadcast failures. Even weird fact is the
>>>> older size of the data is 1.4gb while for the new run is just 900 MB
>>>>
>>>> Below is the error message
>>>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>>>
>>>> I find it extremely weird considering that the data size is very well
>>>> under the thresholds.
>>>>
>>>> Are there any other ways to find what could be the issue and how we can
>>>> rectify this issue?
>>>>
>>>> Could the data characteristics be an issue?
>>>>
>>>> Any help would be immensely appreciated.
>>>>
>>>> Thanks
>>>>
>>>


A code change for spark ui in Sql tab

2024-07-30 Thread Donvi
Hi, Community
I'm a new onboarder in the Spark community and find some lag between Spark and 
DBR in Spark UI. This is in DBR for cost based optimizer in Spark UI: 
https://docs.databricks.com/en/optimizations/cbo.html#spark-sql-ui. To 
implement similar thing in open source part, I've implmented the thing in 
https://github.com/apache/spark/pull/47534. I've searched in jira forum, but 
got nothing useful. So I make this email to confirm whether such functionality 
in open source is supported and what I can do if I want to make this change 
checkin.


Thanks,
Weihan Tang


Re: [Spark Connect] connection issue

2024-07-29 Thread Prabodh Agarwal
Glad it worked!

On Tue, 30 Jul, 2024, 11:12 Ilango,  wrote:

>
> Thanks Prabodh. I copied the spark connect jar to  $SPARK_HOME/jars
> folder.  And passed the location as —jars attr. Its working now. I could
> submit spark jobs via spark connect.
>
> Really appreciate the help.
>
>
>
> Thanks,
> Elango
>
>
> On Tue, 30 Jul 2024 at 11:05 AM, Prabodh Agarwal 
> wrote:
>
>> Yeah. I understand the problem. One of the ways is to actually place the
>> spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark
>> connect. Using the `--packages` or the `--jars` option is flaky in case of
>> spark connect.
>>
>> You can instead manually place the relevant spark connect jar file in the
>> `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars`
>> option from your start command.
>>
>> On Mon, Jul 29, 2024 at 7:01 PM Ilango  wrote:
>>
>>>
>>> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
>>> path. It seems like the spark connect dependency issue. My spark node is
>>> air gapped node so no internet is allowed. Can I download the spark connect
>>> jar and pom files locally and share the local paths? How can I share the
>>> local jars ?
>>>
>>> Error message:
>>>
>>> :: problems summary ::
>>>
>>>  WARNINGS
>>>
>>> module not found:
>>> org.apache.spark#spark-connect_2.12;3.5.1
>>>
>>>
>>>
>>>  local-m2-cache: tried
>>>
>>>
>>>
>>>
>>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>>
>>>
>>>
>>>      local-ivy-cache: tried
>>>
>>>
>>>
>>>
>>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar
>>>
>>>
>>>
>>>  central: tried
>>>
>>>
>>>
>>>
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>>
>>>
>>>
>>>  spark-packages: tried
>>>
>>>
>>>
>>>
>>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>>
>>>
>>>
>>> ::::::
>>>
>>>
>>>
>>> ::  UNRESOLVED DEPENDENCIES ::
>>>
>>>
>>>
>>> ::::::
>>>
>>>
>>>
>>> :: org.apache.spark#spark-connect_2.12;3.5.1: not found
>>>
>>>
>>>
>>> ::
>>>
>>>
>>>
>>>
>>>
>>>  ERRORS
>>>
>>> Server access error at url
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>  (java.net.ConnectException:
>>> Connection timed out (Connection timed out))
>>>
>>>
>>>
>>> Server access error at url
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-co

Re: [Spark Connect] connection issue

2024-07-29 Thread Ilango
Thanks Prabodh. I copied the spark connect jar to  $SPARK_HOME/jars
folder.  And passed the location as —jars attr. Its working now. I could
submit spark jobs via spark connect.

Really appreciate the help.



Thanks,
Elango


On Tue, 30 Jul 2024 at 11:05 AM, Prabodh Agarwal 
wrote:

> Yeah. I understand the problem. One of the ways is to actually place the
> spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark
> connect. Using the `--packages` or the `--jars` option is flaky in case of
> spark connect.
>
> You can instead manually place the relevant spark connect jar file in the
> `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars`
> option from your start command.
>
> On Mon, Jul 29, 2024 at 7:01 PM Ilango  wrote:
>
>>
>> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
>> path. It seems like the spark connect dependency issue. My spark node is
>> air gapped node so no internet is allowed. Can I download the spark connect
>> jar and pom files locally and share the local paths? How can I share the
>> local jars ?
>>
>> Error message:
>>
>> :: problems summary ::
>>
>>  WARNINGS
>>
>> module not found:
>> org.apache.spark#spark-connect_2.12;3.5.1
>>
>>
>>
>>  local-m2-cache: tried
>>
>>
>>
>>
>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>
>>
>>
>>  local-ivy-cache: tried
>>
>>
>>
>>
>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar
>>
>>
>>
>>  central: tried
>>
>>
>>
>>
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>
>>
>>
>>  spark-packages: tried
>>
>>
>>
>>
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>
>>
>>
>> ::
>>
>>
>>
>> ::  UNRESOLVED DEPENDENCIES     ::
>>
>>
>>
>> ::
>>
>>
>>
>> :: org.apache.spark#spark-connect_2.12;3.5.1: not found
>>
>>
>>
>> ::
>>
>>
>>
>>
>>
>>  ERRORS
>>
>> Server access error at url
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>  (java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>> Server access error at url
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>> Server access error at url
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>  (java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>> Server access error at url
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>>
>>
>

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread Meena Rajani
You probably have to increase jvm/jdk memory size

https://stackoverflow.com/questions/1565388/increase-heap-size-in-java


On Mon, Jul 29, 2024 at 9:36 PM mike Jadoo  wrote:

> Thanks.   I just downloaded the corretto  but I got this error message,
> which was the same as before. [It was shared with me that this saying that
> I have limited resources, i think]
>
> ---Py4JJavaError
>  Traceback (most recent call last)
> Cell In[3], line 13  8 squared_rdd = rdd.map(lambda x: x * x) 10 # 
> Persist the DataFrame in memory 11 
> #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results 
> into a list---> 13 result = squared_rdd.collect() 15 # Print the result   
>   16 print(result)
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, 
> in RDD.collect(self)   1831 with SCCallSiteSync(self.context):   1832 
> assert self.ctx._jvm is not None-> 1833 sock_info = 
> self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())   1834 return 
> list(_load_from_socket(sock_info, self._jrdd_deserializer))
>
> File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in 
> JavaMember.__call__(self, *args)   1316 command = proto.CALL_COMMAND_NAME +\  
>  1317 self.command_header +\   1318 args_command +\   1319 
> proto.END_COMMAND_PART   1321 answer = 
> self.gateway_client.send_command(command)-> 1322 return_value = 
> get_return_value(   1323 answer, self.gateway_client, self.target_id, 
> self.name)   1325 for temp_arg in temp_args:   1326 if hasattr(temp_arg, 
> "_detach"):
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179,
>  in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, 
> **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180   
>   except Py4JJavaError as e:181 converted = 
> convert_exception(e.java_exception)
>
> File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in 
> get_return_value(answer, gateway_client, target_id, name)324 value = 
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == 
> REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error 
> occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", 
> name), value)329 else:330 raise Py4JError(331 "An 
> error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 
> format(target_id, ".", name, value))
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 
> in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 
> (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: 
> Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": 
> CreateProcess error=5, Access is denied
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
>   at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
>   at 
> org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
>   at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
>   at org.apache.spark.scheduler.Task.run(Task.scala:141)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(T

Re: [Spark Connect] connection issue

2024-07-29 Thread Prabodh Agarwal
Yeah. I understand the problem. One of the ways is to actually place the
spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark
connect. Using the `--packages` or the `--jars` option is flaky in case of
spark connect.

You can instead manually place the relevant spark connect jar file in the
`$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars`
option from your start command.

On Mon, Jul 29, 2024 at 7:01 PM Ilango  wrote:

>
> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
> path. It seems like the spark connect dependency issue. My spark node is
> air gapped node so no internet is allowed. Can I download the spark connect
> jar and pom files locally and share the local paths? How can I share the
> local jars ?
>
> Error message:
>
> :: problems summary ::
>
>  WARNINGS
>
>     module not found: org.apache.spark#spark-connect_2.12;3.5.1
>
>
>
>  local-m2-cache: tried
>
>
>
>
> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>
>
>
>  local-ivy-cache: tried
>
>
>
>
> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar
>
>
>
>  central: tried
>
>
>
>
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>
>
>
>  spark-packages: tried
>
>
>
>
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>
>
>
> ::
>
>
>
> ::  UNRESOLVED DEPENDENCIES ::
>
>
>
> ::::::
>
>
>
> :: org.apache.spark#spark-connect_2.12;3.5.1: not found
>
>
>
> ::
>
>
>
>
>
> :::: ERRORS
>
> Server access error at url
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>  (java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
> Server access error at url
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
> Server access error at url
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>  (java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
> Server access error at url
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
>
>
> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
>
> Exception in thread "main" java.lang.RuntimeException: [unresolved
> dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found]
>
> at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608)
>
> at
> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
>
> at
> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)
>
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
>
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
>
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
>
> at
> org.apache.spark.deploy.Spark

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread Sadha Chilukoori
Hi Mike,

This appears to be an access issue on Windows + Python. Can you try setting
up the PYTHON_PATH environment variable as described in this stackoverflow
post
https://stackoverflow.com/questions/60414394/createprocess-error-5-access-is-denied-pyspark

- Sadha

On Mon, Jul 29, 2024 at 3:39 PM mike Jadoo  wrote:

> Thanks.   I just downloaded the corretto  but I got this error message,
> which was the same as before. [It was shared with me that this saying that
> I have limited resources, i think]
>
> ---Py4JJavaError
>  Traceback (most recent call last)
> Cell In[3], line 13  8 squared_rdd = rdd.map(lambda x: x * x) 10 # 
> Persist the DataFrame in memory 11 
> #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results 
> into a list---> 13 result = squared_rdd.collect() 15 # Print the result   
>   16 print(result)
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, 
> in RDD.collect(self)   1831 with SCCallSiteSync(self.context):   1832 
> assert self.ctx._jvm is not None-> 1833 sock_info = 
> self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())   1834 return 
> list(_load_from_socket(sock_info, self._jrdd_deserializer))
>
> File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in 
> JavaMember.__call__(self, *args)   1316 command = proto.CALL_COMMAND_NAME +\  
>  1317 self.command_header +\   1318 args_command +\   1319 
> proto.END_COMMAND_PART   1321 answer = 
> self.gateway_client.send_command(command)-> 1322 return_value = 
> get_return_value(   1323 answer, self.gateway_client, self.target_id, 
> self.name)   1325 for temp_arg in temp_args:   1326 if hasattr(temp_arg, 
> "_detach"):
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179,
>  in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, 
> **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180   
>   except Py4JJavaError as e:181 converted = 
> convert_exception(e.java_exception)
>
> File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in 
> get_return_value(answer, gateway_client, target_id, name)324 value = 
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == 
> REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error 
> occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", 
> name), value)329 else:330 raise Py4JError(331 "An 
> error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 
> format(target_id, ".", name, value))
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 
> in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 
> (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: 
> Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": 
> CreateProcess error=5, Access is denied
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
>   at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
>   at 
> org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
>   at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
>   at org.apache.spark.scheduler.Task.run(Task.scala:141)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
>   at 
> java.base/java.util.concurrent.

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread mike Jadoo
Thanks.   I just downloaded the corretto  but I got this error message,
which was the same as before. [It was shared with me that this saying that
I have limited resources, i think]

---Py4JJavaError
Traceback (most recent call last)
Cell In[3], line 13  8 squared_rdd = rdd.map(lambda x: x * x)
10 # Persist the DataFrame in memory 11
#squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the
results into a list---> 13 result = squared_rdd.collect() 15 #
Print the result 16 print(result)

File 
C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833,
in RDD.collect(self)   1831 with SCCallSiteSync(self.context):   1832
   assert self.ctx._jvm is not None-> 1833 sock_info =
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())   1834
return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in
JavaMember.__call__(self, *args)   1316 command =
proto.CALL_COMMAND_NAME +\   1317 self.command_header +\   1318
 args_command +\   1319 proto.END_COMMAND_PART   1321 answer =
self.gateway_client.send_command(command)-> 1322 return_value =
get_return_value(   1323 answer, self.gateway_client,
self.target_id, self.name)   1325 for temp_arg in temp_args:   1326
 if hasattr(temp_arg, "_detach"):

File 
C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179,
in capture_sql_exception..deco(*a, **kw)177 def deco(*a:
Any, **kw: Any) -> Any:178 try:--> 179 return f(*a,
**kw)180 except Py4JJavaError as e:181 converted =
convert_exception(e.java_exception)

File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in
get_return_value(answer, gateway_client, target_id, name)324 value
= OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if
answer[1] == REFERENCE_TYPE:--> 326 raise Py4JJavaError(327
 "An error occurred while calling {0}{1}{2}.\n".328
format(target_id, ".", name), value)329 else:330 raise
Py4JError(331 "An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0
in stage 0.0 (TID 7) (mjadoo.myfiosgateway.com executor driver):
java.io.IOException: Cannot run program
"C:\Users\mikej\AppData\Local\Programs\Python\Python312":
CreateProcess error=5, Access is denied
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at 
org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: CreateProcess error=5, Access is denied
at java.base/java.lang.ProcessImpl.create(Native Method)
at java.base/java.lang.ProcessImpl.(ProcessImpl.java:492)
at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:153)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
... 19 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread Sadha Chilukoori
Hi Mike,

I'm not sure about the minimum requirements of a machine for running Spark.
But to run some Pyspark scripts (and Jupiter notbebooks) on a local
machine, I found the following steps are the easiest.


I installed Amazon corretto and updated the java_home variable as
instructed here
https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html
(Any other java works too, I'm used to corretto from work).

Then installed the Pyspark module using pip, which enabled me run Pyspark
on my machine.

-Sadha

On Mon, Jul 29, 2024, 12:51 PM mike Jadoo  wrote:

> Hello,
>
> I am trying to run Pyspark on my computer without success.  I follow
> several different directions from online sources and it appears that I need
> to get a faster computer.
>
> I wanted to ask what are some recommendations for computer specifications
> to run PySpark (Apache Spark).
>
> Any help would be greatly appreciated.
>
> Thank you,
>
> Mike
>


Re: [Spark Connect] connection issue

2024-07-29 Thread Ilango
Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
path. It seems like the spark connect dependency issue. My spark node is
air gapped node so no internet is allowed. Can I download the spark connect
jar and pom files locally and share the local paths? How can I share the
local jars ?

Error message:

:: problems summary ::

 WARNINGS

module not found: org.apache.spark#spark-connect_2.12;3.5.1



 local-m2-cache: tried




file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar



 local-ivy-cache: tried




/root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




/root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar



 central: tried




https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar



 spark-packages: tried




https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar



::



::  UNRESOLVED DEPENDENCIES ::



::



:: org.apache.spark#spark-connect_2.12;3.5.1: not found



::





 ERRORS

Server access error at url
https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
(java.net.ConnectException:
Connection timed out (Connection timed out))



Server access error at url
https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
Connection timed out (Connection timed out))



Server access error at url
https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
(java.net.ConnectException:
Connection timed out (Connection timed out))



Server access error at url
https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
Connection timed out (Connection timed out))





:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS

Exception in thread "main" java.lang.RuntimeException: [unresolved
dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found]

at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608)

at
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)

at
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)

at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)

at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)





Thanks,
Elango


On Mon, 29 Jul 2024 at 6:45 PM, Prabodh Agarwal 
wrote:

> The spark connect startup prints the log location. Is that not feasible
> for you?
> For me log comes to $SPARK_HOME/logs
>
> On Mon, 29 Jul, 2024, 15:30 Ilango,  wrote:
>
>>
>> Hi all,
>>
>>
>> I am facing issues with a Spark Connect application running on a Spark
>> standalone cluster (without YARN and HDFS). After executing the
>> start-connect-server.sh script with the specified packages, I observe a
>> process ID for a short period but am unable to see the corresponding port
>> (default 15002) associated with that PID. The process automatically stops
>> after around 10 minutes.
>>
>> Since the Spark History server is not enabled, I am unable to locate the
>> relevant logs or error messages. The logs for currently running Spark
>> applicatio

Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread mike Jadoo
Hello,

I am trying to run Pyspark on my computer without success.  I follow
several different directions from online sources and it appears that I need
to get a faster computer.

I wanted to ask what are some recommendations for computer specifications
to run PySpark (Apache Spark).

Any help would be greatly appreciated.

Thank you,

Mike


Re: [Spark Connect] connection issue

2024-07-29 Thread Prabodh Agarwal
The spark connect startup prints the log location. Is that not feasible for
you?
For me log comes to $SPARK_HOME/logs

On Mon, 29 Jul, 2024, 15:30 Ilango,  wrote:

>
> Hi all,
>
>
> I am facing issues with a Spark Connect application running on a Spark
> standalone cluster (without YARN and HDFS). After executing the
> start-connect-server.sh script with the specified packages, I observe a
> process ID for a short period but am unable to see the corresponding port
> (default 15002) associated with that PID. The process automatically stops
> after around 10 minutes.
>
> Since the Spark History server is not enabled, I am unable to locate the
> relevant logs or error messages. The logs for currently running Spark
> applications are accessible from the Spark UI, but I am unsure where to
> find the logs for the Spark Connect application and service.
>
> Could you please advise on where to find the logs or error messages
> related to Spark Connect?
>
>
>
>
> Thanks,
> Elango
>


[Spark Connect] connection issue

2024-07-29 Thread Ilango
Hi all,


I am facing issues with a Spark Connect application running on a Spark
standalone cluster (without YARN and HDFS). After executing the
start-connect-server.sh script with the specified packages, I observe a
process ID for a short period but am unable to see the corresponding port
(default 15002) associated with that PID. The process automatically stops
after around 10 minutes.

Since the Spark History server is not enabled, I am unable to locate the
relevant logs or error messages. The logs for currently running Spark
applications are accessible from the Spark UI, but I am unsure where to
find the logs for the Spark Connect application and service.

Could you please advise on where to find the logs or error messages related
to Spark Connect?




Thanks,
Elango


Re: [Issue] Spark SQL - broadcast failure

2024-07-23 Thread Sudharshan V
We removed the explicit broadcast for that particular table and it took
longer time since the join type changed from BHJ to SMJ.

I wanted to understand how I can find what went wrong with the broadcast
now.
How do I know the size of the table inside of spark memory.

I have tried to cache the table hoping I could see the table size in the
storage tab of spark UI of EMR.

But I see no data there .

Thanks

On Tue, 23 Jul, 2024, 12:48 pm Sudharshan V, 
wrote:

> Hi all, apologies for the delayed response.
>
> We are using spark version 3.4.1 in jar and EMR 6.11 runtime.
>
> We have disabled the auto broadcast always and would broadcast the smaller
> tables using explicit broadcast.
>
> It was working fine historically and only now it is failing.
>
> The data sizes I mentioned was taken from S3.
>
> Thanks,
> Sudharshan
>
> On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, 
> wrote:
>
>> Can you try disabling broadcast join and see what happens?
>>
>> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
>> wrote:
>>
>>> Hi all,
>>>
>>> Been facing a weird issue lately.
>>> In our production code base , we have an explicit broadcast for a small
>>> table.
>>> It is just a look up table that is around 1gb in size in s3 and just had
>>> few million records and 5 columns.
>>>
>>> The ETL was running fine , but with no change from the codebase nor the
>>> infrastructure, we are getting broadcast failures. Even weird fact is the
>>> older size of the data is 1.4gb while for the new run is just 900 MB
>>>
>>> Below is the error message
>>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>>
>>> I find it extremely weird considering that the data size is very well
>>> under the thresholds.
>>>
>>> Are there any other ways to find what could be the issue and how we can
>>> rectify this issue?
>>>
>>> Could the data characteristics be an issue?
>>>
>>> Any help would be immensely appreciated.
>>>
>>> Thanks
>>>
>>


Re: [Issue] Spark SQL - broadcast failure

2024-07-23 Thread Sudharshan V
Hi all, apologies for the delayed response.

We are using spark version 3.4.1 in jar and EMR 6.11 runtime.

We have disabled the auto broadcast always and would broadcast the smaller
tables using explicit broadcast.

It was working fine historically and only now it is failing.

The data sizes I mentioned was taken from S3.

Thanks,
Sudharshan

On Wed, 17 Jul, 2024, 1:53 am Meena Rajani,  wrote:

> Can you try disabling broadcast join and see what happens?
>
> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
> wrote:
>
>> Hi all,
>>
>> Been facing a weird issue lately.
>> In our production code base , we have an explicit broadcast for a small
>> table.
>> It is just a look up table that is around 1gb in size in s3 and just had
>> few million records and 5 columns.
>>
>> The ETL was running fine , but with no change from the codebase nor the
>> infrastructure, we are getting broadcast failures. Even weird fact is the
>> older size of the data is 1.4gb while for the new run is just 900 MB
>>
>> Below is the error message
>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>
>> I find it extremely weird considering that the data size is very well
>> under the thresholds.
>>
>> Are there any other ways to find what could be the issue and how we can
>> rectify this issue?
>>
>> Could the data characteristics be an issue?
>>
>> Any help would be immensely appreciated.
>>
>> Thanks
>>
>


[Spark SQL]: Why the OptimizeSkewedJoin rule does not optimize FullOuterJoin?

2024-07-22 Thread 王仲轩(万章)
Hi,
I am a beginner in Spark and currently learning the Spark source code. I have a 
question about the AQE rule OptimizeSkewedJoin. 
I have a SQL query using SMJ FullOuterJoin, where there is read skew on the 
left side (the case is mentioned below). 
case:
remote bytes read total (min, med, max)
90.5 GiB [bytes:97189264140] (208.5 MiB [bytes:218673776], 210.0 MiB 
[bytes:220191607], 18.1 GiB [bytes:19467332173])
However, the OptimizeSkewedJoin rule does not optimize FullOuterJoin. I would 
like to know the reason behind this.
Thanks.


[spark connect] issue in testing spark connect

2024-07-19 Thread Ilango
Hi all,


I am currently using a Spark standalone cluster, which is functioning as
expected. Users are able to connect to the cluster and submit jobs without
any issues.



I am also testing the Spark Connect capability, which will enable external
clients to submit jobs to the cluster. To start the Spark Connect server, I
am running the command `/sbin/start-connect-server.sh --packages
org.apache.spark:spark-connect_2.12:3.5.1` on the Spark master node. The
command executes without any errors, suggesting that the Spark Connect
server is running successfully.



However, I am unable to access the Spark UI at to verify the Spark Connect
server's status. Can someone please provide guidance on how to confirm that
Spark Connect is functioning properly?



Additionally, when attempting to run a Spark code snippet in Jupyter
Notebook to test Spark Connect, I encounter the following error. If someone
is familiar with this issue or could provide assistance in resolving it, I
would greatly appreciate it.



import pyspark

import pandas

import pyarrow

import grpc_status

import grpc

import torch

from pyspark.sql import SparkSession

import os

os.environ["SPARK_HOME"] = "/path/to/my/standalone/pyspark/cluster"

spark = SparkSession.builder.remote("sc://").getOrCreate()

#spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()



Error :
/pyspark/sql/connect/session.py:185: UserWarning: <_InactiveRpcError of RPC
that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN:
ipv4::15002: Failed to connect to remote host: Connection refused"
debug_error_string = "UNKNOWN:Error received from peer
{grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4::15002: Failed to connect to remote host: Connection refused",
grpc_status:14, created_time:"2024-07-19T14:48:49.12739279+08:00"}"

warnings.warn(str(e))





Thanks,
Elango


Error on Spark history UI api when used with Apache Knox

2024-07-18 Thread thomas.mau...@etu.umontpellier.fr.INVALID
Hello,
I am securing my Spark history UI with Apache knox and am having trouble with 
the proxied SparkhistoryUI as mentionned in this thread: 
https://lists.apache.org/thread/xwzo4g6qk2hqmn62t6p162b8t6l3hldq

I managed to edit the rewrite rules to make the css load on the proxied 
sparkhistory UI but I am now facing another problem I don' t manage to 
overcome. 
When the pages loads the js code triggers a jquery call to the api: 
`$.getJSON(uiRoot + "/api/v1/applications", appParams`
The problem is that when Knox is proxying Spark history UI this uiRoot is not 
the right one it's https://knox-host:8443/api/v1/applications instead of 
https://spark-ui-host:18080/api/v1/applications

Apache Knox rewrite rules don't seem to be able to fix that so this is why I am 
sending a message here to see if any of you have a better idea, maybe something 
related to the proxy setup.

I tried to set `spark.ui.proxyBase` to the Spark History UI URL and port so 
that Knox tries to reach it but 

I end up with a 302 error `Cross-Origin Request Blocked: The Same Origin Policy 
disallows reading the remote resource at 
https://host:18080/api/v1/applications?limit=2147483647&status=completed. 
(Reason: CORS header ‘Access-Control-Allow-Origin’ missing). Status code: 
302.`, and I don' t see anything related to CORS in the SHS configuration.


I would love to have any suggestion on why the Spark rules don't work out of 
the box on Knox or how I could fix that api call.

Thank you very much for your responses

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



problem using spark 3.4 with spots

2024-07-18 Thread wafa gabouj
Hello,

Hello, I am working with spark on dataiku using spots on S3 ( not in demand
instances). I had no problem until I moved from park 3.3 to spark 3.4 ! I
always have this fail and could not understand what configuration in the
new version of spark lead to it

*org.apache.spark.SparkException: Job aborted due to stage failure:
Authorized committer (attemptNumber=0, stage=2, partition=357) failed; but
task commit success, data duplication may happen.*
Here is the full log

[09:46:42] [INFO] [dku.utils]  - [2024/07/17-09:46:42.680] [Thread-6]
[ERROR] [org.apache.spark.sql.execution.datasources.FileFormatWriter]
- Aborting job 2271622d-848a-4719-ad86-81d951235dbb.
[09:46:42] [INFO] [dku.utils]  - org.apache.spark.SparkException: Job
aborted due to stage failure: Authorized committer (attemptNumber=0,
stage=2, partition=715) failed; but task commit success, data
duplication may happen. reason=ExecutorLostFailure(1,false,Some(The
executor with id 1 was deleted by a user or the framework.))
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
~[scala-library-2.12.17.jar:?]
[09:46:42] [INFO] [dku.utils]  -at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
~[scala-library-2.12.17.jar:?]
[09:46:42] [INFO] [dku.utils]  -at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
~[scala-library-2.12.17.jar:?]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1(DAGScheduler.scala:1199)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1$adapted(DAGScheduler.scala:1199)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.17.jar:?]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.handleStageFailed(DAGScheduler.scala:1199)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2981)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
~[spark-core_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
~[spark-sql_2.12-3.4.1.jar:3.4.1]
[09:46:42] [INFO] [dku.utils]  -at
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125

Re: [Issue] Spark SQL - broadcast failure

2024-07-16 Thread Meena Rajani
Can you try disabling broadcast join and see what happens?

On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
wrote:

> Hi all,
>
> Been facing a weird issue lately.
> In our production code base , we have an explicit broadcast for a small
> table.
> It is just a look up table that is around 1gb in size in s3 and just had
> few million records and 5 columns.
>
> The ETL was running fine , but with no change from the codebase nor the
> infrastructure, we are getting broadcast failures. Even weird fact is the
> older size of the data is 1.4gb while for the new run is just 900 MB
>
> Below is the error message
> Cannot broadcast table that is larger than 8 GB : 8GB.
>
> I find it extremely weird considering that the data size is very well
> under the thresholds.
>
> Are there any other ways to find what could be the issue and how we can
> rectify this issue?
>
> Could the data characteristics be an issue?
>
> Any help would be immensely appreciated.
>
> Thanks
>


Re: [Issue] Spark SQL - broadcast failure

2024-07-16 Thread Mich Talebzadeh
It will help if you mention the Spark version and the piece of problematic
code

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 16 Jul 2024 at 08:51, Sudharshan V 
wrote:

>
> On Mon, 8 Jul, 2024, 7:53 pm Sudharshan V, 
> wrote:
>
>> Hi all,
>>
>> Been facing a weird issue lately.
>> In our production code base , we have an explicit broadcast for a small
>> table.
>> It is just a look up table that is around 1gb in size in s3 and just had
>> few million records and 5 columns.
>>
>> The ETL was running fine , but with no change from the codebase nor the
>> infrastructure, we are getting broadcast failures. Even weird fact is the
>> older size of the data is 1.4gb while for the new run is just 900 MB
>>
>> Below is the error message
>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>
>> I find it extremely weird considering that the data size is very well
>> under the thresholds.
>>
>> Are there any other ways to find what could be the issue and how we can
>> rectify this issue?
>>
>> Could the data characteristics be an issue?
>>
>> Any help would be immensely appreciated.
>>
>> Thanks
>>
>


Re: Help wanted on securing spark with Apache Knox / JWT

2024-07-12 Thread Adam Binford
You need to use the spark.ui.filters setting on the history server
https://spark.apache.org/docs/latest/configuration.html#spark-ui:

spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter
spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.param.type=org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler
spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.param.authentication.provider.url=https://
:8443/gateway/knoxsso/api/v1/websso
...etc

On Thu, Jul 11, 2024 at 4:18 PM Thomas Mauran
 wrote:

> Hello,
> I am sending this email to the mailing list, to get your help on a problem
> that I can't seem to resolve myself.
>
> I am trying to secure Spark history ui running with Yarn as master using
> Apache Knox.
>
> From the Knox configuration point of view I managed to secure the Spark
> service, if I go on https://:8443/gateway/default/spark3history I have to
> login using SSO then I get redirected to spark history server web ui which
> works as expected.
>
> But if I directly access Spark without getting logged in I don't get
> redirected to Knox login page which is what I would like to have, same as
> HDFS and YarnUI.
>
> From what I see in Spark documentation the webui needs to be protected
> using the filter system. I can' t seem to find a filter to protect my Spark
> history UI using Knox, I protected both HDFS and Yarn by adding this in
> core-site.xml which works fine.
>
> 
> hadoop.http.authentication.type
>
> org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler
> 
> hadoop.http.authentication.authentication.provider.url
> 
> https://:8443/gateway/knoxsso/api/v1/websso
>
>
> 
> hadoop.http.authentication.public.key.pem
> 
>
> Adding those properties allowed me to get redirected to knox host page
> when I didn' t login yet.
>
> I am wondering if you knew how to secure Spark history UI to have the same
> behavior.
>
> Do you know what configuration I am missing to redirect it back to the
> Knox gateway login page from the Spark history UI as for the other services
> where the JWT token is passed and used for keeping the user session ?
>
> I tried to play with the filters especially
> org.apache.hadoop.security.authentication.server.AuthenticationFilter but
> didn' t manage to get anything working, so I don' t even know if this is
> the right way to do.
>
> Thanks for your answer
>
>

-- 
Adam Binford


Help wanted on securing spark with Apache Knox / JWT

2024-07-11 Thread Thomas Mauran
Hello, 
I am sending this email to the mailing list, to get your help on a problem that 
I can't seem to resolve myself. 



I am trying to secure Spark history ui running with Yarn as master using Apache 
Knox. 

>From the Knox configuration point of view I managed to secure the Spark 
>service, if I go on https://:8443/gateway/default/spark3history I have to 
>login using SSO then I get redirected to spark history server web ui which 
>works as expected. 

But if I directly access Spark without getting logged in I don't get redirected 
to Knox login page which is what I would like to have, same as HDFS and YarnUI. 

>From what I see in Spark documentation the webui needs to be protected using 
>the filter system. I can' t seem to find a filter to protect my Spark history 
>UI using Knox, I protected both HDFS and Yarn by adding this in core-site.xml 
>which works fine. 
< property > < name > hadoop.http.authentication.type  < value > 
org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler
   < property > < name > 
hadoop.http.authentication.authentication.provider.url  < value > 
https:// < knox-hostname > :8443/gateway/knoxsso/api/v1/websso   < property > < name > hadoop.http.authentication.public.key.pem  < value > < token >   


Adding those properties allowed me to get redirected to knox host page when I 
didn' t login yet. 

I am wondering if you knew how to secure Spark history UI to have the same 
behavior. 

Do you know what configuration I am missing to redirect it back to the Knox 
gateway login page from the Spark history UI as for the other services where 
the JWT token is passed and used for keeping the user session ? 

I tried to play with the filters especially 
org.apache.hadoop.security.authentication.server.AuthenticationFilter but didn' 
t manage to get anything working, so I don' t even know if this is the right 
way to do. 

Thanks for your answer 



[Issue] Spark SQL - broadcast failure

2024-07-08 Thread Sudharshan V
Hi all,

Been facing a weird issue lately.
In our production code base , we have an explicit broadcast for a small
table.
It is just a look up table that is around 1gb in size in s3 and just had
few million records and 5 columns.

The ETL was running fine , but with no change from the codebase nor the
infrastructure, we are getting broadcast failures. Even weird fact is the
older size of the data is 1.4gb while for the new run is just 900 MB

Below is the error message
Cannot broadcast table that is larger than 8 GB : 8GB.

I find it extremely weird considering that the data size is very well under
the thresholds.

Are there any other ways to find what could be the issue and how we can
rectify this issue?

Could the data characteristics be an issue?

Any help would be immensely appreciated.

Thanks


running snowflake query using spark connect on a standalone cluster

2024-07-07 Thread Prabodh Agarwal
I have configured a spark standalone cluster as follows:

```
# start spark master
$SPARK_HOME/sbin/start-master.sh

# start 2 spark workers
SPARK_WORKER_INSTANCES=2 $SPARK_HOME/sbin/start-worker.sh
spark://localhost:7077

# start spark connect
$SPARK_HOME/sbin/start-connect-server.sh --properties-file
./connect.properties --master spark://localhost:7077
```

My properties file is defined as follows:

```
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.plugins=io.dataflint.spark.SparkDataflintPlugin spark.jars.packages
org.apache.spark:spark-connect_2.12:3.5.1,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-aws:0.15.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.spark:spark-avro_2.12:3.5.1,software.amazon.awssdk:sso:2.18.40,io.dataflint:spark_2.12:0.2.2,net.snowflake:spark-snowflake_2.12:2.16.0-spark_3.4,net.snowflake:snowflake-jdbc:3.16.1
spark.driver.extraJavaOptions=-verbose:class
spark.executor.extraJavaOptions=-verbose:class
```

Now I start my pyspark job which connects with this remote instance and
then tries to query the table. The snowflake query is fired correctly. It
shows up in my Snowflake query history, but then I start getting failures.

```
24/07/07 22:37:26 INFO ErrorUtils: Spark Connect error during: execute.
UserId: pbd. SessionId: 462444eb-82d3-475b-8dd0-ce35d5047405.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 3) (192.168.29.6 executor 1): java.lang.ClassNotFoundException:
net.snowflake.spark.snowflake.io.SnowflakeResultSetPartition
at
org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:124)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException:
net.snowflake.spark.snowflake.io.SnowflakeResultSetPartition
at java.base/java.lang.ClassLoader.findClass(ClassLoader.java:724)
at
org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.java:35)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at
org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.java:40)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at
org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:109)
... 21 more

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at

How to use spark.connect.Plan from dependency in a custom Spark Connect RelationPlugin?

2024-06-30 Thread Sem
Hello!

I'm trying to create my own Spark Connect Plugin by implementing
`org.apache.sql.connect.plugin.RelationPlugin`. My target is spark 4.0;

What I did:

1. I added compile time dependency `spark-connect-common_2.13`
2. I added `import public "spark/connect/base.proto";` to my message
3. I added `spark.connect.Plan data = 1;` to my message

I can successfully run `mvn generate-sources` but generated code cannot
be compiled.

I got a lot of errors:
- `org.sparkproject.connect.protobuf.Descriptors.FileDescriptor cannot
be converted to com.google.protobuf.Descriptors.FileDescriptor`
- `type argument org.apache.spark.connect.proto.Plan is not within
bounds of type-variable MType`
- `org.apache.spark.connect.proto.Plan cannot be converted to
com.google.protobuf.MessageLite`

I do not use any `LITE` runtime optimization for my proto message. I
even try to add explicitly `option java_generate_equals_and_hash =
true;` and `option optimize_for=SPEED;` but it did not help.

Is there any way to use `spark.connect.Proto` from `spark-connect-
common` in custom plugins? Or is there any other way to pass
`DataFrame` from client to a plugin that implements `RelationPlugin`?

Thanks in advance!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Does Spark 4.0 add Sparkstreaming SQL

2024-06-26 Thread ????
Docking with CDC through Spark Streaming SQL, writing flow processing logic and 
window functions in SQL, is Spark 4.0 supported





308027...@qq.com



 

Help Needed: Distributed Logging in Spark Application

2024-06-24 Thread Zsuzsanna D
Hi,

In my Spark application, I need to log data row by row directly from the
executors to avoid overwhelming the driver's memory, which is already going
places.

I am exploring the possibility of implementing a distributed logging
strategy where each executor logs its output directly, rather than
collecting all data to the driver first.

Can you recommend best practices or tools for implementing distributed
logging from executors in Spark applications?

Thank you very much for your time and consideration. I look forward to your
response.

Best,
ZS


Re: Spark Decommission

2024-06-20 Thread Rajesh Mahindra
Thank Ahmed, thats useful information

On Wed, Jun 19, 2024 at 1:36 AM Khaldi, Ahmed 
wrote:

> Hey Rajesh,
>
>
>
> Fromm y experience, it’s a stable feature, however you must keep in mind
> that it will not guarantee that you will not lose the data that is on the
> pods of the nodes getting a spot kill. Once you have a spot a kill, you
> have 120s to give the node back to the cloud provider. This is when the
> decommission script will start and sometimes 120s is enough to migrate the
> shuffle/rdd blocks, and sometimes it’s not. It really depends on your
> workload and data at the end.
>
>
>
>
>
> *Best regards,*
>
>
>
> *Ahmed Khaldi*
>
> Solutions Architect
>
>
>
> *NetApp Limited.*
>
> +33617424566 Mobile Phone
>
> kah...@netapp.com 
>
>
>
>
>
>
>
> *From: *Rajesh Mahindra 
> *Date: *Tuesday, 18 June 2024 at 23:54
> *To: *user@spark.apache.org 
> *Subject: *Spark Decommission
>
> Vous ne recevez pas souvent de courriers de la part de rjshmh...@gmail.com.
> Découvrez pourquoi cela est important
> <https://aka.ms/LearnAboutSenderIdentification>
>
>
>
> *EXTERNAL EMAIL - USE CAUTION when clicking links or attachments *
>
>
>
> Hi folks,
>
>
>
> I am planning to leverage the "Spark Decommission" feature in production
> since our company uses SPOT instances on Kubernetes. I wanted to get a
> sense of how stable the feature is for production usage and if any one has
> thoughts around trying it out in production, especially in kubernetes
> environment.
>
>
>
> Thanks,
>
> Rajesh
>


Re: Help in understanding Exchange in Spark UI

2024-06-20 Thread Mich Talebzadeh
OK, I gave an answer in StackOverflow. Happy reading

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 20 Jun 2024 at 17:15, Dhruv Singla  wrote:

> Hey Team
>
> I've posted a question of StackOverflow. The link is -
> https://stackoverflow.com/questions/78644118/understanding-exchange-in-spark-ui
>
> I haven't got any responses yet. If possible could you please look into
> it? If you need me to write the question in the mailing list, I can do that
> as well.
>
> Thanks & Regards
> Dhruv
>


Help in understanding Exchange in Spark UI

2024-06-20 Thread Dhruv Singla
Hey Team

I've posted a question of StackOverflow. The link is -
https://stackoverflow.com/questions/78644118/understanding-exchange-in-spark-ui

I haven't got any responses yet. If possible could you please look into it?
If you need me to write the question in the mailing list, I can do that as
well.

Thanks & Regards
Dhruv


Re: [SPARK-48423] Unable to save ML Pipeline to azure blob storage

2024-06-19 Thread Chhavi Bansal
Hello Team,
I am pinging back on this thread to get a pair of eyes on this issue.
Ticket:  https://issues.apache.org/jira/browse/SPARK-48423

On Thu, 6 Jun 2024 at 00:19, Chhavi Bansal  wrote:

> Hello team,
> I was exploring on how to save ML pipeline to azure blob storage, but was
> setback by an issue where it complains of  `fs.azure.account.key`  not
> being found in the configuration even when I have provided the values in
> the pipelineModel.option(key1,value1) field. I considered raising a
> ticket on spark https://issues.apache.org/jira/browse/SPARK-48423, where
> I describe the entire scenario. I tried debugging the code and found that
> this key is being explicitly asked for in the code. The only solution was
> to again set it part of spark.conf which could result to a race condition
> since we work on multi-tenant architecture.
>
>
>
> Since saving to Azure blob storage would be common, Can someone please
> guide me if I am missing something in the `.option` clause?
>
>
>
> I would be happy to make a contribution to the code if someone can shed
> some light on how this could be solved.
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


-- 
Thanks and Regards,
Chhavi Bansal


Re: Spark Decommission

2024-06-19 Thread Khaldi, Ahmed
Hey Rajesh,

Fromm y experience, it’s a stable feature, however you must keep in mind that 
it will not guarantee that you will not lose the data that is on the pods of 
the nodes getting a spot kill. Once you have a spot a kill, you have 120s to 
give the node back to the cloud provider. This is when the decommission script 
will start and sometimes 120s is enough to migrate the shuffle/rdd blocks, and 
sometimes it’s not. It really depends on your workload and data at the end.


Best regards,

Ahmed Khaldi
Solutions Architect

NetApp Limited.
+33617424566 Mobile Phone
kah...@netapp.com<mailto:pump...@netapp.com>



From: Rajesh Mahindra 
Date: Tuesday, 18 June 2024 at 23:54
To: user@spark.apache.org 
Subject: Spark Decommission
Vous ne recevez pas souvent de courriers de la part de rjshmh...@gmail.com. 
Découvrez pourquoi cela est 
important<https://aka.ms/LearnAboutSenderIdentification>

EXTERNAL EMAIL - USE CAUTION when clicking links or attachments


Hi folks,

I am planning to leverage the "Spark Decommission" feature in production since 
our company uses SPOT instances on Kubernetes. I wanted to get a sense of how 
stable the feature is for production usage and if any one has thoughts around 
trying it out in production, especially in kubernetes environment.

Thanks,
Rajesh


Spark Decommission

2024-06-18 Thread Rajesh Mahindra
Hi folks,

I am planning to leverage the "Spark Decommission" feature in production
since our company uses SPOT instances on Kubernetes. I wanted to get a
sense of how stable the feature is for production usage and if any one has
thoughts around trying it out in production, especially in kubernetes
environment.

Thanks,
Rajesh


Re: Update mode in spark structured streaming

2024-06-15 Thread Mich Talebzadeh
Best to qualify your thoughts with an example

By using the foreachBatch function combined with the update output mode in
Spark Structured Streaming, you can effectively handle and integrate
late-arriving data into your aggregations. This approach will allow you to
continuously update your aggregated results with both on-time and late data

example

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, window, sum as spark_sum, max
as spark_max, current_timestamp

# Create Spark session
spark = SparkSession.builder.appName("exampleWithRate").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Simulate a stream of data with an event time
stream = spark.readStream.format("rate").option("rowsPerSecond", 5).load()
base_timestamp = current_timestamp()
stream = stream.withColumn("event_time", (base_timestamp + (col("value") *
60).cast("interval second")).cast("timestamp"))
stream = stream.withColumn("value", col("value") % 10)

def process_batch(batch_df, batch_id):
# Read current state from an external store (simulated here as a static
DataFrame)
current_state = spark.createDataFrame(
[(1, 10, '2024-06-13 10:00:00')],
["key", "total_value", "max_event_time"]
).withColumn("max_event_time", col("max_event_time").cast("timestamp"))

# Perform aggregation including late data handling
aggregated_batch = batch_df.groupBy("value").agg(
spark_sum("value").alias("total_value"),
spark_max("event_time").alias("max_event_time")
)

# Merge with current state
merged_state = current_state.union(aggregated_batch)

# Show the merged state
merged_state.show(truncate=False)

# Define your streaming query
streaming_query = (
stream
.withWatermark("event_time", "10 minutes")
.writeStream
.foreachBatch(process_batch)
.outputMode("update")
.start()
)
# Await termination
streaming_query.awaitTermination()


and the output

+---+---+---+
|key|total_value|max_event_time |
+---+---+---+
|1  |10 |2024-06-13 10:00:00|
+---+---+---+

+---+---+---+
|key|total_value|max_event_time |
+---+---+---+
|1  |10 |2024-06-13 10:00:00|
|0  |0  |2024-06-15 16:22:23.642|
|8  |8  |2024-06-15 16:20:23.642|
|2  |4  |2024-06-15 16:24:23.642|
|4  |8  |2024-06-15 16:26:23.642|
|9  |9  |2024-06-15 16:21:23.642|
|5  |5  |2024-06-15 16:17:23.642|
|1  |2  |2024-06-15 16:23:23.642|
|3  |6  |2024-06-15 16:25:23.642|
|6  |6  |2024-06-15 16:18:23.642|
|7  |7  |2024-06-15 16:19:23.642|
+---+---+---+

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime


Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 14 Jun 2024 at 20:13, Om Prakash  wrote:

> Hi Team,
>
> Hope you all are doing well. I have run into a use case in which I want to
> do the aggregation in foreachbatch and use update mode for handling late
> data in structured streaming. Will this approach work in effectively
> capturing late arriving data in the aggregations?  Please help.
>
>
>
> Thanking you,
> Om Prakash
>


Update mode in spark structured streaming

2024-06-14 Thread Om Prakash
Hi Team,

Hope you all are doing well. I have run into a use case in which I want to
do the aggregation in foreachbatch and use update mode for handling late
data in structured streaming. Will this approach work in effectively
capturing late arriving data in the aggregations?  Please help.



Thanking you,
Om Prakash


Re: Re: OOM issue in Spark Driver

2024-06-11 Thread Mich Talebzadeh
In a nutshell, the culprit for the OOM  issue in your Spark driver appears
to be memory leakage or inefficient memory usage within your application.
This could be caused by factors such as:

   1. Accumulation of data or objects in memory over time without proper
   cleanup.
   2. Inefficient data processing or transformations leading to excessive
   memory usage.
   3. Long-running tasks or stages that accumulate memory usage.
   4. Suboptimal Spark configuration settings, such as insufficient memory
   allocation for the driver or executors.
   5. Check stages and executor tabs in Spark GUI

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

PhD Imperial College London
London, United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 11 Jun 2024 at 06:50, Lingzhe Sun  wrote:

> Hi Kathick,
>
> That suggests you're not performing stateful operations and therefore
> there're no state related metrics. You should consider other aspects that
> may cause OOM.
> Checking logs will always be a good start. And it would be better if some
> colleague of you is familiar with JVM and OOM related issues.
>
> BS
> Lingzhe Sun
>
>
> *From:* Karthick Nk 
> *Date:* 2024-06-11 13:28
> *To:* Lingzhe Sun 
> *CC:* Andrzej Zera ; User 
> *Subject:* Re: Re: OOM issue in Spark Driver
> Hi Lingzhe,
>
> I am able to get the below stats(i.e input rate, process rate, input rows
> etc..), but not able to find the exact stats that Andrzej asking (ie. 
> Aggregated
> Number Of Total State Rows), Could you guide me on how do I get those
> details for states under structured streaming.
> [image: image.png]
>
> Details:
> I am using Databricks runtime version: 13.3 LTS (includes Apache Spark
> 3.4.1, Scala 2.12)
> Driver and worker type:
> [image: image.png]
>
>
> Thanks
>
>
> On Tue, Jun 11, 2024 at 7:34 AM Lingzhe Sun 
> wrote:
>
>> Hi Kathick,
>>
>> I believed that what Andrzej means is that you should check
>> Aggregated Number Of Total State Rows
>> metirc which you could find in the structured streaming UI tab, which
>> indicate the total number of your states, only if you perform stateful
>> operations. If that increase indefinitely, you should probably check your
>> code logic.
>>
>> BS
>> Lingzhe Sun
>>
>>
>> *From:* Karthick Nk 
>> *Date:* 2024-06-09 14:45
>> *To:* Andrzej Zera 
>> *CC:* user 
>> *Subject:* Re: OOM issue in Spark Driver
>> Hi Andrzej,
>>
>> We are using both driver and workers too,
>> Details are as follows
>> Driver size:128GB Memory, 64 cores.
>> Executors size: 64GB Memory, 32 Cores (Executors 1 to 10 - Autoscale)
>>
>> Workers memory usage:
>> One of the worker memory usage screenshot:
>> [image: image.png]
>>
>>
>> State metrics details below:
>> [image: image.png]
>> [image: image.png]
>>
>> I am not getting memory-related info from the structure streaming tab,
>> Could you help me here?
>>
>> Please let me know if you need more details.
>>
>> If possible we can connect once at your time and look into the issue
>> which will be more helpful to me.
>>
>> Thanks
>>
>> On Sat, Jun 8, 2024 at 2:41 PM Andrzej Zera 
>> wrote:
>>
>>> Hey, do you perform stateful operations? Maybe your state is growing
>>> indefinitely - a screenshot with state metrics would help (you can find it
>>> in Spark UI -> Structured Streaming -> your query). Do you have a
>>> driver-only cluster or do you have workers too? What's the memory usage
>>> profile at workers?
>>>

Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-08 Thread Someshwar Kale
Hi Chhavi,

Currently there is no way to handle backtick(`) spark StructType. Hence the
field name a.b and `a.b` are completely different within StructType.

To handle that, I have added a custom implementation fixing StringIndexer#
validateAndTransformSchema. You can refer to the code on my github
<https://github.com/skale1990/LearnSpark/blob/main/src/main/java/com/som/learnspark/TestCustomStringIndexer.scala>
.

*Regards,*
*Someshwar Kale *





On Sat, Jun 8, 2024 at 12:00 PM Chhavi Bansal 
wrote:

> Hi Someshwar,
> Thanks for the response, I have added my comments to the ticket
> <https://issues.apache.org/jira/browse/SPARK-48463>.
>
>
> Thanks,
> Chhavi Bansal
>
> On Thu, 6 Jun 2024 at 17:28, Someshwar Kale  wrote:
>
>> As a fix, you may consider adding a transformer to rename columns
>> (perhaps replace all columns with dot to underscore) and use the renamed
>> columns in your pipeline as below-
>>
>> val renameColumn = new 
>> RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
>> val si = new 
>> StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
>> val pipeline = new Pipeline().setStages(Array(renameColumn, si))
>> pipeline.fit(flattenedDf).transform(flattenedDf).show()
>>
>>
>> refer my comment
>> <https://issues.apache.org/jira/browse/SPARK-48463?focusedCommentId=17852751&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17852751>
>>  for
>> elaboration.
>> Thanks!!
>>
>> *Regards,*
>> *Someshwar Kale*
>>
>>
>>
>>
>>
>> On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
>> wrote:
>>
>>> Hello team
>>> I was exploring feature transformation exposed via Mllib on nested
>>> dataset, and encountered an error while applying any transformer to a
>>> column with dot notation naming. I thought of raising a ticket on spark
>>> https://issues.apache.org/jira/browse/SPARK-48463, where I have
>>> mentioned the entire scenario.
>>>
>>> I wanted to get suggestions on what would be the best way to solve the
>>> problem while using the dot notation. One workaround is to use`_` while
>>> flattening the dataframe, but that would mean having an additional overhead
>>> to convert back to `.` (dot notation ) since that’s the convention for our
>>> other flattened data.
>>>
>>> I would be happy to make a contribution to the code if someone can shed
>>> some light on how this could be solved.
>>>
>>>
>>>
>>> --
>>> Thanks and Regards,
>>> Chhavi Bansal
>>>
>>
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-08 Thread Chhavi Bansal
Hi Someshwar,
Thanks for the response, I have added my comments to the ticket
<https://issues.apache.org/jira/browse/SPARK-48463>.


Thanks,
Chhavi Bansal

On Thu, 6 Jun 2024 at 17:28, Someshwar Kale  wrote:

> As a fix, you may consider adding a transformer to rename columns (perhaps
> replace all columns with dot to underscore) and use the renamed columns in
> your pipeline as below-
>
> val renameColumn = new 
> RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
> val si = new 
> StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(renameColumn, si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show()
>
>
> refer my comment
> <https://issues.apache.org/jira/browse/SPARK-48463?focusedCommentId=17852751&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17852751>
>  for
> elaboration.
> Thanks!!
>
> *Regards,*
> *Someshwar Kale*
>
>
>
>
>
> On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
> wrote:
>
>> Hello team
>> I was exploring feature transformation exposed via Mllib on nested
>> dataset, and encountered an error while applying any transformer to a
>> column with dot notation naming. I thought of raising a ticket on spark
>> https://issues.apache.org/jira/browse/SPARK-48463, where I have
>> mentioned the entire scenario.
>>
>> I wanted to get suggestions on what would be the best way to solve the
>> problem while using the dot notation. One workaround is to use`_` while
>> flattening the dataframe, but that would mean having an additional overhead
>> to convert back to `.` (dot notation ) since that’s the convention for our
>> other flattened data.
>>
>> I would be happy to make a contribution to the code if someone can shed
>> some light on how this could be solved.
>>
>>
>>
>> --
>> Thanks and Regards,
>> Chhavi Bansal
>>
>

-- 
Thanks and Regards,
Chhavi Bansal


Re: OOM issue in Spark Driver

2024-06-08 Thread Andrzej Zera
Hey, do you perform stateful operations? Maybe your state is growing
indefinitely - a screenshot with state metrics would help (you can find it
in Spark UI -> Structured Streaming -> your query). Do you have a
driver-only cluster or do you have workers too? What's the memory usage
profile at workers?

Regards,
Andrzej


sob., 8 cze 2024 o 10:39 Karthick Nk  napisał(a):

> Hi All,
>
> I am using the pyspark structure streaming with Azure Databricks for data
> load process.
>
> In the Pipeline I am using a Job cluster and I am running only one
> pipeline, I am getting the OUT OF MEMORY issue while running for a
> long time. When I inspect the metrics of the cluster I found that, the
> memory usage getting increased by time by time even when there is no
> huge volume of data.
>
> [image: image.png]
>
>
> [image: image.png]
>
> After 4 hours of running the pipeline continuously, I am getting out of
> memory issue where used memory in the driver getting increased from 47 GB
> to 111 GB which is almost triple, I am unable to understand why this many
> memory occcupied in the driver. Am I missing anything here to notice? Could
> you guide me to figure out the root cause?
>
> Note:
> 1. I confirmed persist and unpersist that I used in code taken care
> properly for every batch execution.
> 2. Data is not increasing when time passes, (stream data getting almost
> same amount of data for every batch)
>
>
> Thanks,
>
>
>
>


OOM issue in Spark Driver

2024-06-07 Thread Karthick Nk
Hi All,

I am using the pyspark structure streaming with Azure Databricks for data
load process.

In the Pipeline I am using a Job cluster and I am running only one
pipeline, I am getting the OUT OF MEMORY issue while running for a
long time. When I inspect the metrics of the cluster I found that, the
memory usage getting increased by time by time even when there is no
huge volume of data.

[image: image.png]


[image: image.png]

After 4 hours of running the pipeline continuously, I am getting out of
memory issue where used memory in the driver getting increased from 47 GB
to 111 GB which is almost triple, I am unable to understand why this many
memory occcupied in the driver. Am I missing anything here to notice? Could
you guide me to figure out the root cause?

Note:
1. I confirmed persist and unpersist that I used in code taken care
properly for every batch execution.
2. Data is not increasing when time passes, (stream data getting almost
same amount of data for every batch)


Thanks,


Re: 7368396 - Apache Spark 3.5.1 (Support)

2024-06-07 Thread Sadha Chilukoori
Hi Alex,

Spark is an open source software available under  Apache License 2.0 (
https://www.apache.org/licenses/), further details can be found here in the
FAQ page (https://spark.apache.org/faq.html).

Hope this helps.


Thanks,

Sadha

On Thu, Jun 6, 2024, 1:32 PM SANTOS SOUZA, ALEX 
wrote:

> Hey guys!
>
>
>
> I am part of the team responsible for software approval at EMBRAER S.A.
> We are currently in the process of approving the Apache Spark 3.5.1
> software and are verifying the licensing of the application.
> Therefore, I would like to kindly request you to answer the questions
> below.
>
> -What type of software? (Commercial, Freeware, Component, etc...)
>  A:
>
> -What is the licensing model for commercial use? (Subscription, Perpetual,
> GPL, etc...)
> A:
>
> -What type of license? (By user, Competitor, Device, Server or others)?
> A:
>
> -Number of installations allowed per license/subscription?
> A:
>
> Can it be used in the defense and aerospace industry? (Company that
> manufactures products for national defense)
> A:
>
> -Does the license allow use in any location regardless of the origin of
> the purchase (tax restriction)?
> A:
>
> -Where can I find the End User License Agreement (EULA) for the version in
> question?
> A:
>
>
>
> Desde já, muito obrigado e qualquer dúvida estou à disposição. / Thank you
> very much in advance and I am at your disposal if you have any questions.
>
>
> Att,
>
>
> Alex Santos Souza
>
> Software Asset Management - Embraer
>
> WhatsApp: +55 12 99731-7579
>
> E-mail: alex.santosso...@dxc.com
>
> DXC Technology
>
> São José dos Campos, SP - Brazil
>
>


7368396 - Apache Spark 3.5.1 (Support)

2024-06-06 Thread SANTOS SOUZA, ALEX
Hey guys!



I am part of the team responsible for software approval at EMBRAER S.A.
We are currently in the process of approving the Apache Spark 3.5.1 software 
and are verifying the licensing of the application.
Therefore, I would like to kindly request you to answer the questions below.

-What type of software? (Commercial, Freeware, Component, etc...)
 A:

-What is the licensing model for commercial use? (Subscription, Perpetual, GPL, 
etc...)
A:

-What type of license? (By user, Competitor, Device, Server or others)?
A:

-Number of installations allowed per license/subscription?
A:

Can it be used in the defense and aerospace industry? (Company that 
manufactures products for national defense)
A:

-Does the license allow use in any location regardless of the origin of the 
purchase (tax restriction)?
A:

-Where can I find the End User License Agreement (EULA) for the version in 
question?
A:



Desde já, muito obrigado e qualquer dúvida estou à disposição. / Thank you very 
much in advance and I am at your disposal if you have any questions.


Att,

[cid:babbaea5-d892-4b6e-abd9-d0da0cc3e296]

Alex Santos Souza

Software Asset Management - Embraer

WhatsApp: +55 12 99731-7579

E-mail: alex.santosso...@dxc.com

DXC Technology

São José dos Campos, SP - Brazil



Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-06 Thread Someshwar Kale
As a fix, you may consider adding a transformer to rename columns (perhaps
replace all columns with dot to underscore) and use the renamed columns in
your pipeline as below-

val renameColumn = new
RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
val si = new 
StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
val pipeline = new Pipeline().setStages(Array(renameColumn, si))
pipeline.fit(flattenedDf).transform(flattenedDf).show()


refer my comment
<https://issues.apache.org/jira/browse/SPARK-48463?focusedCommentId=17852751&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17852751>
for
elaboration.
Thanks!!

*Regards,*
*Someshwar Kale*





On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
wrote:

> Hello team
> I was exploring feature transformation exposed via Mllib on nested
> dataset, and encountered an error while applying any transformer to a
> column with dot notation naming. I thought of raising a ticket on spark
> https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned
> the entire scenario.
>
> I wanted to get suggestions on what would be the best way to solve the
> problem while using the dot notation. One workaround is to use`_` while
> flattening the dataframe, but that would mean having an additional overhead
> to convert back to `.` (dot notation ) since that’s the convention for our
> other flattened data.
>
> I would be happy to make a contribution to the code if someone can shed
> some light on how this could be solved.
>
>
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


[SPARK-48423] Unable to save ML Pipeline to azure blob storage

2024-06-05 Thread Chhavi Bansal
Hello team,
I was exploring on how to save ML pipeline to azure blob storage, but was
setback by an issue where it complains of  `fs.azure.account.key`  not
being found in the configuration even when I have provided the values in
the pipelineModel.option(key1,value1) field. I considered raising a ticket
on spark https://issues.apache.org/jira/browse/SPARK-48423, where I
describe the entire scenario. I tried debugging the code and found that
this key is being explicitly asked for in the code. The only solution was
to again set it part of spark.conf which could result to a race condition
since we work on multi-tenant architecture.



Since saving to Azure blob storage would be common, Can someone please
guide me if I am missing something in the `.option` clause?



I would be happy to make a contribution to the code if someone can shed
some light on how this could be solved.

-- 
Thanks and Regards,
Chhavi Bansal


[SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-05 Thread Chhavi Bansal
Hello team
I was exploring feature transformation exposed via Mllib on nested dataset,
and encountered an error while applying any transformer to a column with
dot notation naming. I thought of raising a ticket on spark
https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned
the entire scenario.

I wanted to get suggestions on what would be the best way to solve the
problem while using the dot notation. One workaround is to use`_` while
flattening the dataframe, but that would mean having an additional overhead
to convert back to `.` (dot notation ) since that’s the convention for our
other flattened data.

I would be happy to make a contribution to the code if someone can shed
some light on how this could be solved.



-- 
Thanks and Regards,
Chhavi Bansal


Inquiry Regarding Security Compliance of Apache Spark Docker Image

2024-06-05 Thread Tonmoy Sagar
Dear Apache Team,

I hope this email finds you well.

We are a team from Ernst and Young LLP - India, dedicated to providing 
innovative supply chain solutions for a diverse range of clients. Our team 
recently encountered a pivotal use case necessitating the utilization of 
PySpark for a project aimed at handling substantial volumes of data. As part of 
our deployment strategy, we are endeavouring to implement a Spark-based 
application on our Azure Kubernetes service.

Regrettably, we have encountered challenges from a security perspective with 
the latest Apache Spark Docker image, specifically apache/spark-py:latest. Our 
security team has meticulously conducted an assessment and has generated a 
comprehensive vulnerability report highlighting areas of concern.

Given the non-compliance of the Docker image with our organization's stringent 
security protocols, we find ourselves unable to proceed with its integration 
into our applications. We attach the vulnerability report herewith for your 
perusal.

Considering these circumstances, we kindly request your esteemed team to 
provide any resolutions or guidance that may assist us in mitigating the 
identified security vulnerabilities. Your prompt attention to this matter would 
be greatly appreciated, as it is crucial for the successful deployment and 
operation of our Spark-based application within our infrastructure.

Thank you for your attention to this inquiry, and we look forward to your 
valued support and assistance.



Please find attachment for the vulnerability report
Best Regards,
Tonmoy Sagar | Sr. Consultant | Advisory | Asterisk
Ernst & Young LLP
C-401, Panchshil Tech Park One, Yerawada, Pune, Maharashtra 411006, India
Mobile: +91 8724918230 | tonmoy.sa...@in.ey.com<mailto:tonmoy.sa...@in.ey.com>
Thrive in the Transformative Age with the better-connected consultants - 
ey.com/consulting<http://ey.com/consulting>



The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you have received 
this communication in error, please notify us immediately by responding to this 
email and then delete it from your system. The firm is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


spark_vulnerability_report.xlsx
Description: spark_vulnerability_report.xlsx

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

[ANNOUNCE] Announcing Apache Spark 4.0.0-preview1

2024-06-03 Thread Wenchen Fan
Hi all,

To enable wide-scale community testing of the upcoming Spark 4.0 release,
the Apache Spark community has posted a preview release of Spark 4.0. This
preview is not a stable release in terms of either API or functionality,
but it is meant to give the community early access to try the code that
will become Spark 4.0. If you would like to test the release, please
download it, and send feedback using either the mailing lists or JIRA.

There are a lot of exciting new features added to Spark 4.0, including ANSI
mode by default, Python data source, polymorphic Python UDTF, string
collation support, new VARIANT data type, streaming state store data
source, structured logging, Java 17 by default, and many more.

We'd like to thank our contributors and users for their contributions and
early feedback to this release. This release would not have been possible
without you.

To download Spark 4.0.0-preview1, head over to the download page:
https://archive.apache.org/dist/spark/spark-4.0.0-preview1 . It's also
available in PyPI, with version name "4.0.0.dev1".

Thanks,

Wenchen


[apache-spark][spark-dataframe] DataFrameWriter.partitionBy does not guarantee previous sort result

2024-05-31 Thread leeyc0
I have a dataset that have the following schema:
(timestamp, partitionKey, logValue)

I want to have the dataset to be sorted by timestamp, but write to file in
the follow directory layout:
outputDir/partitionKey/files
The output file only contains logValue, that is, timestamp is used for
sorting only and is not used for output.
(FYI, logValue contains textual representation of timestamp which is not
sortable)

My first attempt is to use DataFrameWriter.partitionBy:
dataset
.sort("timestamp")
.select("partitionKey", "logValue")
.write()
.partitionBy("partitionKey")
.text("output");

However, as mentioned in SPARK-44512 (
https://issues.apache.org/jira/browse/SPARK-44512), this does not guarantee
the output is globally sorted.
(note: I found that even setting
spark.sql.optimizer.plannedWrite.enabled=false still does not guarantee
sorted result in low memory environment)

And the developers say DataFrameWriter.partitionBy does not guarantee
sorted results:
"Although I understand Apache Spark 3.4.0 changes the behavior like the
above, I don't think there is a contract that Apache Spark's `partitionBy`
operation preserves the previous ordering."

To workaround this problem, I have to resort to creating a hadoop output
format by extending org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
and output the file by saveAsHadoopFile:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

public final class PartitionedMultipleTextOutputFormat
extends MultipleTextOutputFormat {
@SuppressWarnings("MissingJavadocMethod")
public PartitionedMultipleTextOutputFormat() {
super();
}

@Override
protected Object generateActualKey(final Object key, final V value) {
return NullWritable.get();
}

@Override
protected String generateFileNameForKeyValue(final Object key, final V
value, final String leaf) {
return new Path(key.toString(), leaf).toString();
}
}

private static Tuple2 mapRDDToDomainLogPair(final Row row) {
final String domain = row.getAs(" partitionKey ");
final var log = (String) row.getAs("logValue");
final var logTextClass = new Text(log);
return new Tuple2(domain, logTextClass);
}

dataset
.sort("timestamp")
.javaRDD()
.mapToPair(TheClass::mapRDDToDomainLogPair)
.saveAsHadoopFile(hdfsTmpPath, String.class, Text.class,
PartitionedMultipleTextOutputFormat.class, GzipCodec.class);

Which seems a little bit hacky.
Does anyone have another better method?


Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Amin Mosayyebzadeh
I am reading from a single file:
df = spark.read.text("s3a://test-bucket/testfile.csv")



On Fri, May 31, 2024 at 5:26 AM Mich Talebzadeh 
wrote:

> Tell Spark to read from a single file
>
> data = spark.read.text("s3a://test-bucket/testfile.csv")
>
> This clarifies to Spark that you are dealing with a single file and avoids
> any bucket-like interpretation.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
> wrote:
>
>> I will work on the first two possible causes.
>> For the third one, which I guess is the real problem, Spark treats the
>> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
>> to access _spark_metadata with url
>> s3a://test-bucket/testfile.csv/_spark_metadata
>> testfile.csv is an object and should not be treated as a bucket. But I am
>> not sure how to prevent Spark from doing that.
>>
>


Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Mich Talebzadeh
Tell Spark to read from a single file

data = spark.read.text("s3a://test-bucket/testfile.csv")

This clarifies to Spark that you are dealing with a single file and avoids
any bucket-like interpretation.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
wrote:

> I will work on the first two possible causes.
> For the third one, which I guess is the real problem, Spark treats the
> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
> to access _spark_metadata with url
> s3a://test-bucket/testfile.csv/_spark_metadata
> testfile.csv is an object and should not be treated as a bucket. But I am
> not sure how to prevent Spark from doing that.
>


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
I will work on the first two possible causes.
For the third one, which I guess is the real problem, Spark treats the
testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
to access _spark_metadata with url
s3a://test-bucket/testfile.csv/_spark_metadata
testfile.csv is an object and should not be treated as a bucket. But I am
not sure how to prevent Spark from doing that.


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
ok

some observations


   - Spark job successfully lists the S3 bucket containing testfile.csv.
   - Spark job can retrieve the file size (33 Bytes) for testfile.csv.
   - Spark job fails to read the actual data from testfile.csv.
   - The printed content from testfile.csv is an empty list.
   - Spark logs show a debug message with an exception related to
   UserGroupInformation while trying to access the _spark_metadata file
   associated with testfile.csv.

possible causes


   - Permission Issues: Spark user (likely ubuntu based on logs) might lack
   the necessary permissions to access the testfile.csv file or the
   _spark_metadata file on S3 storage.
   - Spark Configuration: Issues with Spark's configuration for S3 access,
   such as missing credentials or incorrect security settings.
   - Spark attempting to read unnecessary files: The _spark_metadata file
   might not be essential for your current operation, and Spark's attempt to
   read it could be causing the issue.


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh 
wrote:

> The code should read testfile.csv file from s3. and print the content. It
> only prints a empty list although the file has content.
> I have also checked our custom s3 storage (Ceph based) logs and I see only
> LIST operations coming from Spark, there is no GET object operation for
> testfile.csv
>
> The only error I see in DEBUG output is these lines:
>
> =
> 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
> from s3a://test-bucket/testfile.csv/_spark_metadata
> 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
> (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
> java.lang.Exception
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
> at
> org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
> at
> org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
> at
> org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:201)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at
> org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at
> py4j.commands.AbstractCommand.invokeMethod(Abs

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
The code should read testfile.csv file from s3. and print the content. It
only prints a empty list although the file has content.
I have also checked our custom s3 storage (Ceph based) logs and I see only
LIST operations coming from Spark, there is no GET object operation for
testfile.csv

The only error I see in DEBUG output is these lines:

=
24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
(auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
java.lang.Exception
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at
org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
at
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
at
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
at
org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:201)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

===
Which I am not sure if it is related since Spark can see and list the
bucket (it also returns the correct object size which is 33 Bytes.).

Best,
Amin


On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh 
wrote:

> Hello,
>
> Overall, the exit code of 0 suggests a successful run of your Spark job.
> Analyze the intended purpose of your code and verify the output or Spark UI
> for further confirmation.
>
> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
> exitCode 0.
>
> what to check
>
>
>1. Verify Output: If your Spark job was intended to read data from S3
>and process it, you will need to verify the output to ensure the data was
>handled correctly. This might involve checking if any results were written
>to a designated location or if any transformations were applied
>    successfully.
>2. Review Code:
>3. Check Spark UI:
>
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernh

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
Hello,

Overall, the exit code of 0 suggests a successful run of your Spark job.
Analyze the intended purpose of your code and verify the output or Spark UI
for further confirmation.

24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode
0.

what to check


   1. Verify Output: If your Spark job was intended to read data from S3
   and process it, you will need to verify the output to ensure the data was
   handled correctly. This might involve checking if any results were written
   to a designated location or if any transformations were applied
   successfully.
   2. Review Code:
   3. Check Spark UI:


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
wrote:

> Hi Mich,
>
> Thank you for the help and sorry about the late reply.
> I ran your provided but I got "exitCode 0". Here is the complete output:
>
> =======
>
>
> 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
> 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic,
> amd64
> 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
> 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
> spark.driver.
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
> 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
> executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
> , memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
> name: cpus, amount: 1.0)
> 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
> 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: ubuntu; groups
> with view permissions: EMPTY; users with modify permissions: ubuntu; groups
> with modify permissions: EMPTY
> 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver'
> on port 46321.
> 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
> 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
> 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
> GiB
> 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
> 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
> 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
> MOC-R4PAC08U33-S1C
> 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
> 24/05/30 01:23:39 INFO Executor: Java version 11.0.22
> 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
> (userClassPathFirst = false): ''
> 24/05/30 01:23:39 INFO Executor: Created or updated repl 

Re: [s3a] Spark is not reading s3 object content

2024-05-29 Thread Amin Mosayyebzadeh
Hi Mich,

Thank you for the help and sorry about the late reply.
I ran your provided but I got "exitCode 0". Here is the complete output:

===


24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
24/05/30 01:23:38 INFO ResourceUtils:
==
24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
spark.driver.
24/05/30 01:23:38 INFO ResourceUtils:
==
24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
, memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
name: cpus, amount: 1.0)
24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: ubuntu; groups
with view permissions: EMPTY; users with modify permissions: ubuntu; groups
with modify permissions: EMPTY
24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' on
port 46321.
24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
GiB
24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
MOC-R4PAC08U33-S1C
24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:39 INFO Executor: Java version 11.0.22
24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
(userClassPathFirst = false): ''
24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
24/05/30 01:23:39 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on
MOC-R4PAC08U33-S1C:39343
24/05/30 01:23:39 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block
manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver,
MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir.
24/05/30 01:23:39 INFO SharedState: Warehouse path is
'file:/home/ubuntu/tpch-spark/spark-warehouse'.
24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 10 second(s).
24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system
started
24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing:
24/05/30 01:23:43 INFO FileSo

[Spark on k8s] A issue of k8s resource creation order

2024-05-29 Thread Tao Yang
Hi, team! I have a spark on k8s issue which posts in
https://stackoverflow.com/questions/78537132/spark-on-k8s-resource-creation-order

Need help


Re: Spark Protobuf Deserialization

2024-05-27 Thread Sandish Kumar HN
Did you try using to_protobuf and from_protobuf ?

https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html


On Mon, May 27, 2024 at 15:45 Satyam Raj  wrote:

> Hello guys,
> We're using Spark 3.5.0 for processing Kafka source that contains protobuf
> serialized data. The format is as follows:
>
> message Request {
>   long sent_ts = 1;
>   Event[] event = 2;
> }
>
> message Event {
>  string event_name = 1;
>  bytes event_bytes = 2;
> }
>
> The event_bytes contains the data for the event_name. event_name is the
> className of the Protobuf class.
> Currently, we parse the protobuf message from the Kafka topic, and for
> every event in the array, push the event_bytes to the `event_name` topic,
> over which spark jobs run and use the same event_name protobuf class to
> deserialize the data.
>
> Is there a better way to do all this in a single job?
>


Re: [Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Mich Talebzadeh
When you use applyInPandasWithState, Spark processes each input row as it
arrives, regardless of whether certain columns, such as the timestamp
column, contain NULL values. This behavior is useful where you want to
handle incomplete or missing data gracefully within your stateful
processing logic. By allowing NULL timestamps to trigger calls to the
stateful function, you can implement custom handling strategies, such as
skipping incomplete records, within your stateful function.


However, it is important to understand that this behavior also *means that
the watermark is not advanced for NULL timestamps*. The watermark is used
for event-time processing in Spark Structured Streaming, to track the
progress of event-time in your data stream and is typically based on the
timestamp column. Since NULL timestamps do not contribute to the watermark
advancement,

Regarding whether you can rely on this behavior for your production code,
it largely depends on your requirements and use case. If your application
logic is designed to handle NULL timestamps appropriately and you have
tested it to ensure it behaves as expected, then you can generally rely on
this behavior. FYI, I have not tested it myself, so I cannot provide a
definitive answer.

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Mon, 27 May 2024 at 22:04, Juan Casse  wrote:

> I am using applyInPandasWithState in PySpark 3.5.0.
>
> I noticed that records with timestamp==NULL are processed (i.e., trigger a
> call to the stateful function). And, as you would expect, does not advance
> the watermark.
>
> I am taking advantage of this in my application.
>
> My question: Is this a supported feature of Spark? Can I rely on this
> behavior for my production code?
>
> Thanks,
> Juan
>


Spark Protobuf Deserialization

2024-05-27 Thread Satyam Raj
Hello guys,
We're using Spark 3.5.0 for processing Kafka source that contains protobuf
serialized data. The format is as follows:

message Request {
  long sent_ts = 1;
  Event[] event = 2;
}

message Event {
 string event_name = 1;
 bytes event_bytes = 2;
}

The event_bytes contains the data for the event_name. event_name is the
className of the Protobuf class.
Currently, we parse the protobuf message from the Kafka topic, and for
every event in the array, push the event_bytes to the `event_name` topic,
over which spark jobs run and use the same event_name protobuf class to
deserialize the data.

Is there a better way to do all this in a single job?


[Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Juan Casse
I am using applyInPandasWithState in PySpark 3.5.0.

I noticed that records with timestamp==NULL are processed (i.e., trigger a
call to the stateful function). And, as you would expect, does not advance
the watermark.

I am taking advantage of this in my application.

My question: Is this a supported feature of Spark? Can I rely on this
behavior for my production code?

Thanks,
Juan


Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Shay Elbaz
Seen this before; had a very(!) complex plan behind a DataFrame, to the point 
where any additional transformation went OOM on the driver.

A quick and ugly solution was to break the plan - convert the DataFrame to rdd 
and back to DF at certain points to make the plan shorter. This has obvious 
drawbacks, and is not recommended in general, but at least we had something 
working. The real, long-term solution was to replace the many ( > 200)  
withColumn() calls to only a few select() calls. You can easily find sources on 
the internet for why this is better. (it was on Spark 2.3, but I think the main 
principles remain). TBH, it was easier than I expected, as it mainly involved 
moving pieces of code from one place to another, and not a "real", meaningful 
refactoring.



From: Mich Talebzadeh 
Sent: Monday, May 27, 2024 15:43
Cc: user@spark.apache.org 
Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame 
Processing


This message contains hyperlinks, take precaution before opening these links.

Few ideas on top of my head for how to go about solving the problem


  1.  Try with subsets: Try reproducing the issue with smaller subsets of your 
data to pinpoint the specific operation causing the memory problems.
  2.  Explode or Flatten Nested Structures: If your DataFrame schema involves 
deep nesting, consider using techniques like explode or flattening to transform 
it into a less nested structure. This can reduce memory usage during operations 
like withColumn.
  3.  Lazy Evaluation: Use select before withColumn: this ensures lazy 
evaluation, meaning Spark only materializes the data when necessary. This can 
improve memory usage compared to directly calling withColumn on the entire 
DataFrame.
  4.  spark.sql.shuffle.partitions: Setting this configuration to a value close 
to the number of executors can improve shuffle performance and potentially 
reduce memory usage.
  5.  Spark UI Monitoring: Utilize the Spark UI to monitor memory usage 
throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan 
 wrote:
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been facing 
while processing certain large and nested DataFrames using Apache Spark. We 
have encountered a scenario where the driver runs out of memory when applying 
the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the 
same DataFrames are processed successfully in Spark 2.4.0.

Problem Summary:
For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes 
the driver to choke and run out of memory. However, the same DataFrames are 
processed successfully in Spark 2.4.0.

Heap Dump Analysis:
We performed a heap dump analysis after enabling heap dump on out-of-memory 
errors, and the analysis revealed the following significant frames and local 
variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
 (DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.

Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Mich Talebzadeh
Few ideas on top of my head for how to go about solving the problem


   1. Try with subsets: Try reproducing the issue with smaller subsets of
   your data to pinpoint the specific operation causing the memory problems.
   2. Explode or Flatten Nested Structures: If your DataFrame schema
   involves deep nesting, consider using techniques like explode or flattening
   to transform it into a less nested structure. This can reduce memory usage
   during operations like withColumn.
   3. Lazy Evaluation: Use select before withColumn: this ensures lazy
   evaluation, meaning Spark only materializes the data when necessary. This
   can improve memory usage compared to directly calling withColumn on the
   entire DataFrame.
   4. spark.sql.shuffle.partitions: Setting this configuration to a value
   close to the number of executors can improve shuffle performance and
   potentially reduce memory usage.
   5. Spark UI Monitoring: Utilize the Spark UI to monitor memory usage
   throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan
 wrote:

> Dear Community,
>
> I'm reaching out to seek your assistance with a memory issue we've been
> facing while processing certain large and nested DataFrames using Apache
> Spark. We have encountered a scenario where the driver runs out of memory
> when applying the `withColumn` method on specific DataFrames in Spark
> 3.4.1. However, the same DataFrames are processed successfully in Spark
> 2.4.0.
>
>
> *Problem Summary:*For certain DataFrames, applying the `withColumn`
> method in Spark 3.4.1 causes the driver to choke and run out of memory.
> However, the same DataFrames are processed successfully in Spark 2.4.0.
>
>
> *Heap Dump Analysis:*We performed a heap dump analysis after enabling
> heap dump on out-of-memory errors, and the analysis revealed the following
> significant frames and local variables:
>
> ```
>
> org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:4273)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:1622)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2820)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2759)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
> (DataPersistenceUtil.scala:88)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (DataPersistenceUtil.scala:19)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (BronzeStep.scala:23)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
> (MainJob.scala:78)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,8

  1   2   3   4   5   6   7   8   9   10   >