Re: Structured Streaming and Spark Connect
Hi Anastasiia, Thanks for the email. I think you can tweak this spark config *spark.connect.session.manager.defaultSessionTimeout, *this is defined here*: * https://github.com/apache/spark/blob/343471dac4b96b43a09763d759b6c30760fb626e/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala#L86-L93 Once the session timeout is changed to be longer Spark won't kill the session and won't kill the query also. Thanks, Wei Liu Anastasiia Sokhova 于2024年9月23日周一 09:41写道: > Dear Spark Team, > > > > I am working with a standalone cluster, and I am using Spark Connect to > submit my applications. > > My current version is 3.5.1. > > > > I am trying to run Structured Streaming Queries with relatively long > trigger intervals (2 hours, 1 day). > > The first issue I encountered was “Streaming query has been idle and > waiting for new data more than 1ms”. I solved it by increasing the > value in the internal config property > ‘spark.sql.streaming.noDataProgressEventInterval’. > > Now my query is not considered idle anymore but Connect expires the > session after ~1 hour, and the query is killed with it. > > > > I believe, I have studied everything I could find online, but I could not > find the answers. > > I would really appreciate if you provided some 😊 > > > > Is it not intended for Spark Connect to support “detached” Streaming > Queries? > > Would you consider detaching StreamingQueries from the sessions that start > them, as they are meant to run continuously? > > Would you consider extending control options in Spark Connect UI (start, > stop, reset checkpoints)? > > It will help the users like me, who want to use Spark’s Structured > Streaming and Connect without running additional applications just to keep > the session alive. > > > > I will be happy to answer any question from your side or provide more > details. > > > > Best regards, > > Anastasiia >
Re: Structured Streaming and Spark Connect
Hi Anastasia, My take is that in its current form, Spark Connect is not suitable for running long-lived Structured Streaming queries in Standalone mode, especially with long trigger intervals. The lack of support for detached streaming queries makes it problematic for this particular use case. To make Structured Streaming work in Standalone mode, you could: 1. Use spark-submit in cluster mode instead of Spark Connect. 2. Consider alternative cluster managers like YARN or k8s for better driver management. Specific answers q) Is Spark Connect intended to support “detached” Streaming Queries? No, currently Spark Connect ties queries to the client session. Streaming queries stop when the session ends. Detached queries are not yet supported, q) Could Streaming Queries be detached from the session, as they are continuous? This is a valid request. Detaching streaming queries would allow them to run independently, ensuring long-running jobs don’t stop when the session ends. This would require changes in Spark’s session management. q) Would you extend control options in Spark Connect UI (start, stop, reset checkpoints)? Yes, adding controls to start, stop, or reset streaming queries would improve usability, especially for production systems. This feature would give users more dynamic management of long-running streaming jobs. Have a look at this article of mine Building an Event-Driven Real-Time Data Processor with Spark Structured Streaming and API Integration <https://www.linkedin.com/pulse/building-event-driven-real-time-data-processor-spark-mich-zy3ef/?trackingId=RIwY%2FePi0jslLiXqOP8mxQ%3D%3D> HTH, Mich Talebzadeh Architect | Data Engineer | Data Science | Financial Crime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 23 Sept 2024 at 17:41, Anastasiia Sokhova wrote: > Dear Spark Team, > > > > I am working with a standalone cluster, and I am using Spark Connect to > submit my applications. > > My current version is 3.5.1. > > > > I am trying to run Structured Streaming Queries with relatively long > trigger intervals (2 hours, 1 day). > > The first issue I encountered was “Streaming query has been idle and > waiting for new data more than 1ms”. I solved it by increasing the > value in the internal config property > ‘spark.sql.streaming.noDataProgressEventInterval’. > > Now my query is not considered idle anymore but Connect expires the > session after ~1 hour, and the query is killed with it. > > > > I believe, I have studied everything I could find online, but I could not > find the answers. > > I would really appreciate if you provided some 😊 > > > > Is it not intended for Spark Connect to support “detached” Streaming > Queries? > > Would you consider detaching StreamingQueries from the sessions that start > them, as they are meant to run continuously? > > Would you consider extending control options in Spark Connect UI (start, > stop, reset checkpoints)? > > It will help the users like me, who want to use Spark’s Structured > Streaming and Connect without running additional applications just to keep > the session alive. > > > > I will be happy to answer any question from your side or provide more > details. > > > > Best regards, > > Anastasiia >
Structured Streaming and Spark Connect
Dear Spark Team, I am working with a standalone cluster, and I am using Spark Connect to submit my applications. My current version is 3.5.1. I am trying to run Structured Streaming Queries with relatively long trigger intervals (2 hours, 1 day). The first issue I encountered was “Streaming query has been idle and waiting for new data more than 1ms”. I solved it by increasing the value in the internal config property ‘spark.sql.streaming.noDataProgressEventInterval’. Now my query is not considered idle anymore but Connect expires the session after ~1 hour, and the query is killed with it. I believe, I have studied everything I could find online, but I could not find the answers. I would really appreciate if you provided some 😊 Is it not intended for Spark Connect to support “detached” Streaming Queries? Would you consider detaching StreamingQueries from the sessions that start them, as they are meant to run continuously? Would you consider extending control options in Spark Connect UI (start, stop, reset checkpoints)? It will help the users like me, who want to use Spark’s Structured Streaming and Connect without running additional applications just to keep the session alive. I will be happy to answer any question from your side or provide more details. Best regards, Anastasiia
Compatibility Issue: Spark 3.5.2 Schema Recognition vs. Spark 3.4.0 with Hive Metastore (Case Sensitivity)
I have seen this before on another release but cannot recall 1. *Initial Setup with Spark 3.5.2*: - I was using *Spark 3.5.2* recent upgrade, *Beeline 2.3.9*, and *Hadoop 3.1.1*. - The issue encountered was that the schema list was not visible, specifically when querying the database. The `SHOW TABLES` or database-related commands did not return expected results. 2. *Issue Details*: - The error indicated that the schema `DS` could not be found, despite the schema existing. This suggests a case sensitivity issue or a compatibility mismatch between Spark 3.5.2 and Hive/Hadoop setup. 3. *Solution by Downgrading Spark:* - *Downgraded Spark from 3.5.2 to 3.4.0*. - After downgrading to Spark 3.4.0, the schema names worked when forced to lowercase (e.g., using `ds` instead of `DS`), and I was able to successfully query the databases and schemas. This suggests that Spark 3.5.2 may handle case sensitivity or schema querying differently, which was incompatible with my setup. By downgrading to 3.4.0 and switching to lowercase schema names, the issue was resolved. *Summary:* - *Spark 3.5.2* caused issues with recognizing schema names, possibly due to case sensitivity or Hive metastore compatibility. - *Downgrading to Spark 3.4.0 *and using *lowercase schema names* resolved the issue, allowing the database and tables to be accessed correctly. I guess it is a good reminder that version compatibility between Spark, Hive, and Hadoop can sometimes cause issues, particularly with metastore interactions and case sensitivity. HTH Mich Talebzadeh, Architect | Data Engineer | Data Science | Financial Crime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
Spark SQL readSideCharPadding issue while reading ENUM column from mysql
I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am querying to Mysql Database and applying `*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working as expected in spark 3.3.1 , but not working with 3.5.0. Where Condition :: `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*` The *st *column is ENUM in the database and it is causing the issue. Below is the Physical Plan of *FILTER* phase : For 3.3.1 : +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED))) For 3.5.0 : +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true)) = OPEN) OR (upper(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR (upper(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true)) = CLOSED))) ----- I have debug it and found that Spark added a property in version 3.4.0 , i.e. **spark.sql.readSideCharPadding** which has default value **true**. Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697 Added a new method in Class **CharVarcharCodegenUtils** public static UTF8String readSidePadding(UTF8String inputStr, int limit) { int numChars = inputStr.numChars(); if (numChars == limit) { return inputStr; } else if (numChars < limit) { return inputStr.rpad(limit, SPACE); } else { return inputStr; } } **This method is appending some whitespace padding to the ENUM values while reading and causing the Issue.** --- When I am removing the UPPER function from the where condition the **FILTER** Phase looks like this : +- Filter (((staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true) = OPEN ) OR (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true) = REOPEN )) OR (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true) = CLOSED )) **You can see it has added some white space after the value and the query runs fine giving the correct result.** But with the UPPER function I am not getting the data. -- I have also tried to disable this Property *spark.sql.readSideCharPadding = false* with following cases : 1. With Upper function in where clause : It is not pushing the filters to Database and the *query works fine*. +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)) 2. But when I am removing the upper function *It is pushing the filter to Mysql with the white spaces and I am not getting the data. (THIS IS A CAUSING VERY BIG ISSUE)* PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON), *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN )),EqualTo(st,CLOSED ))] I cannot move this filter to JDBC read query , also I can't remove this UPPER function in the where clause. Also I found same data getting written to CASSANDRA with *PADDING .*
[Spark Core]: Spark 3.5.0 incompatible with Embedded Kafka
Hi I have been using Embedded Kafka in my batch job to test if my output is correct. It works correctly upto Spark 3.4.3 but fails at Spark 3.5.0. I have tried different versions of Embedded Kafka as well upto the latest version. What might be the problem? Is this something that needs to be fixed in Spark or Embedded Kafka? I am using this library: https://github.com/embeddedkafka/embedded-kafka This is the exception I get: io.github.embeddedkafka.KafkaUnavailableException was thrown. io.github.embeddedkafka.KafkaUnavailableException at io.github.embeddedkafka.ops.ProducerOps.publishToKafka(ProducerOps.scala:219) at io.github.embeddedkafka.ops.ProducerOps.publishToKafka(ProducerOps.scala:82) at io.github.embeddedkafka.ops.ProducerOps.publishToKafka$(ProducerOps.scala:72) at MySpec.publishToKafka(MySpec.scala:5) Please help. Thank you Kind regards, Adesh
Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?
I apologize if my previous explanation was unclear, and I realize I didn’t provide enough context for my question. The reason I want to submit a Spark application to a Kubernetes cluster using the Spark Operator is that I want to use Kubernetes as the Cluster Manager, rather than Standalone mode. This would allow the Spark Connect Driver and Executors to run on different nodes within the Kubernetes cluster. I understand that it is currently possible to launch Spark Connect by setting the Cluster Manager to Standalone. However, in that case, the Driver and Executors run on the same node, which I believe would not scale efficiently. Therefore, I am considering specifying Kubernetes (specifically the Kubernetes API) as the Cluster Manager to dynamically distribute and schedule the Driver and multiple Executor Pods across all nodes in the Kubernetes cluster. With the Spark Operator, it is easy to specify Kubernetes as the Cluster Manager, but the Spark Operator does not allow the use of the "client" deploy mode (i.e., the deploy mode will be "cluster"). On the other hand, Spark Connect does not support the "cluster" deploy mode, leading to a deadlock between the two specifications. That is why I wanted to understand the reason why Spark Connect does not allow the "cluster" deploy mode, and this was the main point of my original question. 2024年9月10日(火) 0:29 Prabodh Agarwal : > Oh. This issue is pretty straightforward to solve actually. Particularly, > in spark-3.5.2. > > Just download the `spark-connect` maven jar and place it in > `$SPARK_HOME/jars`. Then rebuild the docker image. I saw that I had posted > a comment on this Jira as well. I could fix this up for standalone cluster > at least this way. > > On Mon, Sep 9, 2024 at 7:04 PM Nagatomi Yasukazu > wrote: > >> Hi Prabodh, >> >> Thank you for your response. >> >> As you can see from the following JIRA issue, it is possible to run the >> Spark Connect Driver on Kubernetes: >> >> https://issues.apache.org/jira/browse/SPARK-45769 >> >> However, this issue describes a problem that occurs when the Driver and >> Executors are running on different nodes. This could potentially be the >> reason why only Standalone mode is currently supported, but I am not >> certain about it. >> >> Thank you for your attention. >> >> >> 2024年9月9日(月) 12:40 Prabodh Agarwal : >> >>> My 2 cents regarding my experience with using spark connect in cluster >>> mode. >>> >>> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master & >>> other nodes as workers. Deploy spark connect pointing to the master node. >>> This works well. The approach is not well documented, but I could figure >>> it out by hit-and-trial. >>> 2. In k8s, by default; we can actually get the executors to run on >>> kubernetes itself. That is pretty straightforward, but the driver continues >>> to run on a local machine. But yeah, I agree as well, making the driver to >>> run on k8s itself would be slick. >>> >>> Thank you. >>> >>> >>> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu >>> wrote: >>> >>>> Hi All, >>>> >>>> Why is it not possible to specify cluster as the deploy mode for Spark >>>> Connect? >>>> >>>> As discussed in the following thread, it appears that there is an >>>> "arbitrary decision" within spark-submit that "Cluster mode is not >>>> applicable" to Spark Connect. >>>> >>>> GitHub Issue Comment: >>>> >>>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607 >>>> >>>> > This will circumvent the submission error you may have gotten if you >>>> tried to just run the SparkConnectServer directly. From my investigation, >>>> that looks to be an arbitrary decision within spark-submit that Cluster >>>> mode is "not applicable" to SparkConnect. Which is sort of true except when >>>> using this operator :) >>>> >>>> I have reviewed the following commit and pull request, but I could not >>>> find any discussion or reason explaining why cluster mode is not available: >>>> >>>> Related Commit: >>>> >>>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4 >>>> >>>> Related Pull Request: >>>> https://github.com/apache/spark/pull/39928 >>>> >>>> This restriction poses a significant obstacle when trying to use Spark >>>> Connect with the Spark Operator. If there is a technical reason for this, I >>>> would like to know more about it. Additionally, if this issue is being >>>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a >>>> link. >>>> >>>> Thank you in advance. >>>> >>>> Best regards, >>>> Yasukazu Nagatomi >>>> >>>
Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?
Oh. This issue is pretty straightforward to solve actually. Particularly, in spark-3.5.2. Just download the `spark-connect` maven jar and place it in `$SPARK_HOME/jars`. Then rebuild the docker image. I saw that I had posted a comment on this Jira as well. I could fix this up for standalone cluster at least this way. On Mon, Sep 9, 2024 at 7:04 PM Nagatomi Yasukazu wrote: > Hi Prabodh, > > Thank you for your response. > > As you can see from the following JIRA issue, it is possible to run the > Spark Connect Driver on Kubernetes: > > https://issues.apache.org/jira/browse/SPARK-45769 > > However, this issue describes a problem that occurs when the Driver and > Executors are running on different nodes. This could potentially be the > reason why only Standalone mode is currently supported, but I am not > certain about it. > > Thank you for your attention. > > > 2024年9月9日(月) 12:40 Prabodh Agarwal : > >> My 2 cents regarding my experience with using spark connect in cluster >> mode. >> >> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master & >> other nodes as workers. Deploy spark connect pointing to the master node. >> This works well. The approach is not well documented, but I could figure >> it out by hit-and-trial. >> 2. In k8s, by default; we can actually get the executors to run on >> kubernetes itself. That is pretty straightforward, but the driver continues >> to run on a local machine. But yeah, I agree as well, making the driver to >> run on k8s itself would be slick. >> >> Thank you. >> >> >> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu >> wrote: >> >>> Hi All, >>> >>> Why is it not possible to specify cluster as the deploy mode for Spark >>> Connect? >>> >>> As discussed in the following thread, it appears that there is an >>> "arbitrary decision" within spark-submit that "Cluster mode is not >>> applicable" to Spark Connect. >>> >>> GitHub Issue Comment: >>> >>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607 >>> >>> > This will circumvent the submission error you may have gotten if you >>> tried to just run the SparkConnectServer directly. From my investigation, >>> that looks to be an arbitrary decision within spark-submit that Cluster >>> mode is "not applicable" to SparkConnect. Which is sort of true except when >>> using this operator :) >>> >>> I have reviewed the following commit and pull request, but I could not >>> find any discussion or reason explaining why cluster mode is not available: >>> >>> Related Commit: >>> >>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4 >>> >>> Related Pull Request: >>> https://github.com/apache/spark/pull/39928 >>> >>> This restriction poses a significant obstacle when trying to use Spark >>> Connect with the Spark Operator. If there is a technical reason for this, I >>> would like to know more about it. Additionally, if this issue is being >>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a >>> link. >>> >>> Thank you in advance. >>> >>> Best regards, >>> Yasukazu Nagatomi >>> >>
Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?
Hi Prabodh, Thank you for your response. As you can see from the following JIRA issue, it is possible to run the Spark Connect Driver on Kubernetes: https://issues.apache.org/jira/browse/SPARK-45769 However, this issue describes a problem that occurs when the Driver and Executors are running on different nodes. This could potentially be the reason why only Standalone mode is currently supported, but I am not certain about it. Thank you for your attention. 2024年9月9日(月) 12:40 Prabodh Agarwal : > My 2 cents regarding my experience with using spark connect in cluster > mode. > > 1. Create a spark cluster of 2 or more nodes. Make 1 node as master & > other nodes as workers. Deploy spark connect pointing to the master node. > This works well. The approach is not well documented, but I could figure > it out by hit-and-trial. > 2. In k8s, by default; we can actually get the executors to run on > kubernetes itself. That is pretty straightforward, but the driver continues > to run on a local machine. But yeah, I agree as well, making the driver to > run on k8s itself would be slick. > > Thank you. > > > On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu > wrote: > >> Hi All, >> >> Why is it not possible to specify cluster as the deploy mode for Spark >> Connect? >> >> As discussed in the following thread, it appears that there is an >> "arbitrary decision" within spark-submit that "Cluster mode is not >> applicable" to Spark Connect. >> >> GitHub Issue Comment: >> >> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607 >> >> > This will circumvent the submission error you may have gotten if you >> tried to just run the SparkConnectServer directly. From my investigation, >> that looks to be an arbitrary decision within spark-submit that Cluster >> mode is "not applicable" to SparkConnect. Which is sort of true except when >> using this operator :) >> >> I have reviewed the following commit and pull request, but I could not >> find any discussion or reason explaining why cluster mode is not available: >> >> Related Commit: >> >> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4 >> >> Related Pull Request: >> https://github.com/apache/spark/pull/39928 >> >> This restriction poses a significant obstacle when trying to use Spark >> Connect with the Spark Operator. If there is a technical reason for this, I >> would like to know more about it. Additionally, if this issue is being >> tracked on JIRA or elsewhere, I would appreciate it if you could provide a >> link. >> >> Thank you in advance. >> >> Best regards, >> Yasukazu Nagatomi >> >
Spark 3.2.1 vs Spark 3.5.2
Hi everyone, We are migrating our ETL tasks from Spark 3.2.1 (Java 11) to Spark 3.5.2 (Java 17). One of these applications that works fine on 3.2 completely kills our cluster on 3.5.2 The clusters consist of five 256GB workers and a 256GB master. The task is run with "--executor-memory 200G” and is completed in about 15 minutes on 3.2.1 However, when I run with "--executor-memory 200G” on 3.5.2, the workers all die eventually because the worker is unable to allocate more shared memory (as far as I can tell because they have to be rebooted). I then tried with "--executor-memory 100G”. This chugs along for about half an hour and then runs out of disk space (/tmp/ has about 125GB) for shared memory. The 3.2.1 Physical Plan is 11268 lines. The 3.5.2 Physical Plan is 12923 lines. All the consumed data consists of parquet files that live on S3 and are accessed using the s3a protocol configured as: spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider # Enables the hadoop s3a committer spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory spark.hadoop.fs.s3a.threads.max 40 spark.hadoop.fs.s3a.connection.maximum 40 The query itself is basically: final var catalogParts = partsSelectorA.selectParts() .union(partsSelectorB.selectParts()) .union(partsSelectorC.selectParts()) .union(partsSelectorD.selectParts()) .distinct() .persist(); This is followed by some further “lightweight" unions that can be ignored as I have tried excluding these with no effect. Each “selectParts()” method is a select statement on a huge table (~156M rows) combined with a half dozen or more left joins with large (~3M rows) tables. I’m considering trying the 3.5.3RC which resolves some left join issues. Any ideas? I can share more details privately if that can help. Regards, Steve Coy This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/
[CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?
Hi All, Why is it not possible to specify cluster as the deploy mode for Spark Connect? As discussed in the following thread, it appears that there is an "arbitrary decision" within spark-submit that "Cluster mode is not applicable" to Spark Connect. GitHub Issue Comment: https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607 > This will circumvent the submission error you may have gotten if you tried to just run the SparkConnectServer directly. From my investigation, that looks to be an arbitrary decision within spark-submit that Cluster mode is "not applicable" to SparkConnect. Which is sort of true except when using this operator :) I have reviewed the following commit and pull request, but I could not find any discussion or reason explaining why cluster mode is not available: Related Commit: https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4 Related Pull Request: https://github.com/apache/spark/pull/39928 This restriction poses a significant obstacle when trying to use Spark Connect with the Spark Operator. If there is a technical reason for this, I would like to know more about it. Additionally, if this issue is being tracked on JIRA or elsewhere, I would appreciate it if you could provide a link. Thank you in advance. Best regards, Yasukazu Nagatomi
Re: Spark Thrift Server - Not Scaling Down Executors 3.4.2+
The default value of spark.dynamicAllocation.shuffleTracking.enabled was changed from false to true in Spark 3.4.0, disabling it might help. [1] https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-33-to-34 Thanks, Cheng Pan > On Sep 6, 2024, at 00:36, Jayabindu Singh wrote: > > Dear Spark Users, > > We have run into an issue where with spark 3.3.2 using auto scaling with STS > is working fine, but with 3.4.2 or 3.5.2 executors are being left behind and > not scaling down. > Driver makes a call to remove the executor but some (not all) executors never > get removed. > > Has anyone else noticed this or aware of any reported issues? > > Any help will be greatly appreciated. > > Regards > Jay > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Thrift Server - Not Scaling Down Executors 3.4.2+
Dear Spark Users, We have run into an issue where with spark 3.3.2 using auto scaling with STS is working fine, but with 3.4.2 or 3.5.2 executors are being left behind and not scaling down. Driver makes a call to remove the executor but some (not all) executors never get removed. Has anyone else noticed this or aware of any reported issues? Any help will be greatly appreciated. Regards Jay
Re: unable to deploy Pyspark application on GKE, Spark installed using bitnami helm chart
I use https://github.com/kubeflow/spark-operator rather than bitnami chart, but https://medium.com/@kayvan.sol2/spark-on-kubernetes-d566158186c6 shows running spark submit from a master pod exec. Might be something to try. On Mon, Aug 26, 2024 at 12:22 PM karan alang wrote: > We are currently using Dataproc on GCP for running our spark workloads, > and i'm planning to move this workload to Kubernetes(GKE). > > Here is what is done so far : > > Installed Spark using bitnami helm chart: > > ``` > > helm repo add bitnami https://charts.bitnami.com/bitnami > > helm install spark -f sparkConfig.yaml bitnami/spark -n spark > > ``` > > Also, deployed a loadbalancer, yaml used : > > ``` > > apiVersion: v1kind: Servicemetadata: > name: spark-master-lb > labels: > app: spark > component: LoadBalancerspec: > selector: > app.kubernetes.io/component: master > app.kubernetes.io/instance: spark > app.kubernetes.io/name: spark > ports: > - name: webui > port: 8080 > targetPort: 8080 > - name: master > port: 7077 > targetPort: 7077 > type: LoadBalancer > > ``` > > Spark is installed, and the pods have come up. > > When i try to do a spark-submit in cluster mode, it gives following error: > > ``` > > (base) Karans-MacBook-Pro:fromEdward-jan26 karanalang$ > $SPARK_HOME/bin/spark-submit --master spark://:7077 > --deploy-mode cluster --name spark-on-gke > local:///Users/karanalang/Documents/Technology/0.spark-on-gke/StructuredStream-on-gke.py24/08/26 > 12:03:26 WARN Utils: Your hostname, Karans-MacBook-Pro.local resolves to a > loopback address: 127.0.0.1; using 10.42.28.138 instead (on interface > en0)24/08/26 12:03:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > another addressWARNING: An illegal reflective access operation has > occurredWARNING: Illegal reflective access by > org.apache.spark.unsafe.Platform > (file:/Users/karanalang/Documents/Technology/spark-3.1.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.3.jar) > to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider > reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: > Use --illegal-access=warn to enable warnings of further illegal reflective > access operationsWARNING: All illegal access operations will be denied in a > future release > Exception in thread "main" org.apache.spark.SparkException: Cluster deploy > mode is currently not supported for python applications on standalone > clusters. > at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968) > at > org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:273) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > ``` > > In client mode, it gives the following error : > > 24/08/26 12:06:58 ERROR SparkContext: Error initializing SparkContext. > java.lang.NullPointerException > at org.apache.spark.SparkContext.(SparkContext.scala:640) > at > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:238) > at > py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) > at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829)24/08/26 12:06:58 INFO > SparkContext: SparkContext already stopped. > > Couple of questions : > >1. > >is using the helm chart the correct
unable to deploy Pyspark application on GKE, Spark installed using bitnami helm chart
We are currently using Dataproc on GCP for running our spark workloads, and i'm planning to move this workload to Kubernetes(GKE). Here is what is done so far : Installed Spark using bitnami helm chart: ``` helm repo add bitnami https://charts.bitnami.com/bitnami helm install spark -f sparkConfig.yaml bitnami/spark -n spark ``` Also, deployed a loadbalancer, yaml used : ``` apiVersion: v1kind: Servicemetadata: name: spark-master-lb labels: app: spark component: LoadBalancerspec: selector: app.kubernetes.io/component: master app.kubernetes.io/instance: spark app.kubernetes.io/name: spark ports: - name: webui port: 8080 targetPort: 8080 - name: master port: 7077 targetPort: 7077 type: LoadBalancer ``` Spark is installed, and the pods have come up. When i try to do a spark-submit in cluster mode, it gives following error: ``` (base) Karans-MacBook-Pro:fromEdward-jan26 karanalang$ $SPARK_HOME/bin/spark-submit --master spark://:7077 --deploy-mode cluster --name spark-on-gke local:///Users/karanalang/Documents/Technology/0.spark-on-gke/StructuredStream-on-gke.py24/08/26 12:03:26 WARN Utils: Your hostname, Karans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.42.28.138 instead (on interface en0)24/08/26 12:03:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another addressWARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/karanalang/Documents/Technology/spark-3.1.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode is currently not supported for python applications on standalone clusters. at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968) at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:273) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` In client mode, it gives the following error : 24/08/26 12:06:58 ERROR SparkContext: Error initializing SparkContext. java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:640) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)24/08/26 12:06:58 INFO SparkContext: SparkContext already stopped. Couple of questions : 1. is using the helm chart the correct way to install Apache Spark on GKE/k8s (Note - need to install on both GKE and On-prem kubernetes) 2. How to submit pyspark jobs on Spark cluster deployed on GKE (eg. Do I need to create a K8s deployment for each spark job ?) tia ! Here is the stackoverflow link : https://stackoverflow.com/questions/78915988/unable-to-deploy-pyspark-application-on-gke-spark-installed-using-bitnami-helm
Job Opportunities in India or UK with Tier 2 Sponsorship - Spark Expert
Hi Spark Community, I'm a seasoned Data Engineering professional with 13+ years of experience and expertise in Apache Spark, particularly in Structured Streaming. I'm looking for job opportunities in India or the UK that offer Tier 2 sponsorship. If anyone knows of openings or can connect me with potential employers, please reach out. Thanks, Sri Tummala
Re: Spark Reads from MapR and Write to MinIO fails for few batches
Issue resolved , thanks for your time folks.Sent from my iPhoneOn Aug 21, 2024, at 5:38 PM, Prem Sahoo wrote:Hello Team,Could you please check on this request ?On Mon, Aug 19, 2024 at 7:00 PM Prem Sahoo <prem.re...@gmail.com> wrote:Hello Spark and User,could you please shed some light ?On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo <prem.re...@gmail.com> wrote:Hello Spark and User,we have a Spark project which is a long running Spark session where it does below1. We are reading from Mapr FS and writing to MapR FS.2. Another parallel job which reads from MapR Fs and Writes to MinIO object storage.We are finding issues for a few batches of Spark jobs which one writes to MinIO , reads empty data frame/dataset from MapR but the job which reads from & writes to MapR Fs for the same batches never had any issue.I was just going through some blogs and stackoverflow to know that Spark Session which holds both information /config of MapR and Minio sometimes find this issue as Spark Session or context has no correct information so either we need to clear or restart spark session for each batch.Please let me know if you have any suggestions to get rid of this issue.
Expected Release date for Spark 4?
Hi, We've been working with Spark 4 preview for a while and would like to know about the release date of Spark 4 release. Will you be able to provide a planned release date for Spark 4 to helps us plan better? Regards, Vikash
Re: Spark Reads from MapR and Write to MinIO fails for few batches
Hello Team, Could you please check on this request ? On Mon, Aug 19, 2024 at 7:00 PM Prem Sahoo wrote: > Hello Spark and User, > could you please shed some light ? > > On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo wrote: > >> Hello Spark and User, >> we have a Spark project which is a long running Spark session where it >> does below >> 1. We are reading from Mapr FS and writing to MapR FS. >> 2. Another parallel job which reads from MapR Fs and Writes to MinIO >> object storage. >> >> We are finding issues for a few batches of Spark jobs which one writes to >> MinIO , reads empty data frame/dataset from MapR but the job which reads >> from & writes to MapR Fs for the same batches never had any issue. >> >> I was just going through some blogs and stackoverflow to know that Spark >> Session which holds both information /config of MapR and Minio sometimes >> find this issue as Spark Session or context has no correct information so >> either we need to clear or restart spark session for each batch. >> >> >> Please let me know if you have any suggestions to get rid of this issue. >> >>
Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?
I searched [1] using the keywords “reliable” and got nothing, so I cannot draw the same conclusion as you. If an implementation claims to support reliable storage, it should inherit interface ShuffleDriverComponents and override method supportsReliableStorage [2] to return true, for example, Apache Celeborn [3], a Remote Shuffle Service for Spark. Thanks, Cheng Pan [1] https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage [2] https://github.com/apache/spark/blob/v3.5.2/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java#L65-L72 [3] https://github.com/apache/celeborn/blob/v0.5.1/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java#L56 > On Aug 20, 2024, at 18:42, Aaron Grubb wrote: > > Hi Cheng, > > Due to the documentation [1]. This is why I suggested at the end of the > message you replied to that documentation should be updated or > clarified. Can you explain how persistent volume claims in Kubernetes are > "unreliable" storage? > > Thanks, > Aaron > > https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage > > On Tue, 2024-08-20 at 18:37 +0800, Cheng Pan wrote: >> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support >> reliable storage, so the condition 4) is false even with this >> configuration. >> I’m not sure why you think it does. >> >> Thanks, >> Cheng Pan >> >>> On Aug 20, 2024, at 18:27, Aaron Grubb wrote: >>> >>> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the >>> job however it still was not stable in the face of spot >>> instances >>> going away. Adding spark.decommission.enabled=true, >>> spark.storage.decommission.enabled=true and >>> spark.executor.decommission.killInterval=110 >>> appears to have completely stabilized the job (not sure which did the trick >>> as I added them at the same time). Perhaps extra >>> documentation or >>> clarifications should be added as it doesn't seem clear to me how to >>> arrivate at job stability using dynamic allocation without trial and >>> error. >>> >>> On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote: >>>> Hi all, >>>> >>>> I'm running Spark on Kubernetes on AWS using only spot instances for >>>> executors with dynamic allocation enabled. This particular job is >>>> being >>>> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I >>>> had recently switched to using PersistentVolumeClaims in >>>> Spark >>>> with >>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >>>> but kept >>>> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see >>>> under the notes for spark.dynamicAllocation.enabled [2] that >>>> these >>>> configurations are "or" not "and". However, when setting >>>> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with >>>> the >>>> message >>>> >>>> org.apache.spark.SparkException: Dynamic allocation of executors requires >>>> one of the following conditions: 1) enabling external shuffle >>>> service through spark.shuffle.service.enabled. 2) enabling shuffle >>>> tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3) >>>> enabling shuffle blocks decommission through spark.decommission.enabled >>>> and spark.storage.decommission.shuffleBlocks.enabled. 4) >>>> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a >>>> custom ShuffleDataIO who's ShuffleDriverComponents supports >>>> reliable >>>> storage. >>>> >>>> Am I hitting this bug unavoidably? Or is there a configuration I'm missing >>>> to enable >>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >>>> to replace >>>> spark.dynamicAllocation.shuffleTracking.enabled=true? >>>> >>>> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case >>>> >>>> spark.checkpoint.compress >>>> true >>>> spark.driver.cores >>>> 1 &
Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support reliable storage, so the condition 4) is false even with this configuration. I’m not sure why you think it does. Thanks, Cheng Pan > On Aug 20, 2024, at 18:27, Aaron Grubb wrote: > > Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job > however it still was not stable in the face of spot instances > going away. Adding spark.decommission.enabled=true, > spark.storage.decommission.enabled=true and > spark.executor.decommission.killInterval=110 > appears to have completely stabilized the job (not sure which did the trick > as I added them at the same time). Perhaps extra documentation or > clarifications should be added as it doesn't seem clear to me how to arrivate > at job stability using dynamic allocation without trial and > error. > > On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote: >> Hi all, >> >> I'm running Spark on Kubernetes on AWS using only spot instances for >> executors with dynamic allocation enabled. This particular job is >> being >> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I >> had recently switched to using PersistentVolumeClaims in Spark >> with >> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >> but kept >> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see >> under the notes for spark.dynamicAllocation.enabled [2] that these >> configurations are "or" not "and". However, when setting >> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with >> the >> message >> >> org.apache.spark.SparkException: Dynamic allocation of executors requires >> one of the following conditions: 1) enabling external shuffle >> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking >> through spark.dynamicAllocation.shuffleTracking.enabled. 3) >> enabling shuffle blocks decommission through spark.decommission.enabled and >> spark.storage.decommission.shuffleBlocks.enabled. 4) >> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a >> custom ShuffleDataIO who's ShuffleDriverComponents supports reliable >> storage. >> >> Am I hitting this bug unavoidably? Or is there a configuration I'm missing >> to enable >> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >> to replace >> spark.dynamicAllocation.shuffleTracking.enabled=true? >> >> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case >> >> spark.checkpoint.compress >> true >> spark.driver.cores >> 1 >> spark.driver.maxResultSize >> 2g >> spark.driver.memory >> 5140m >> spark.dynamicAllocation.enabled >> true >> spark.dynamicAllocation.executorAllocationRatio >> 0.33 >> spark.dynamicAllocation.maxExecutors >> 20 >> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout >> 30 >> spark.eventLog.enabled >> true >> spark.executor.cores >> 3 >> spark.executor.logs.rolling.enableCompression >> true >> spark.executor.logs.rolling.maxRetainedFiles >> 48 >> spark.executor.logs.rolling.strategy >> time >> spark.executor.logs.rolling.time.interval >> hourly >> spark.hadoop.fs.s3a.impl >> org.apache.hadoop.fs.s3a.S3AFileSystem >> spark.hadoop.fs.s3a.connection.ssl.enabled >> false >> spark.hadoop.fs.s3a.fast.upload >> true >> spark.kryo.registratio
Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?
Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job however it still was not stable in the face of spot instances going away. Adding spark.decommission.enabled=true, spark.storage.decommission.enabled=true and spark.executor.decommission.killInterval=110 appears to have completely stabilized the job (not sure which did the trick as I added them at the same time). Perhaps extra documentation or clarifications should be added as it doesn't seem clear to me how to arrivate at job stability using dynamic allocation without trial and error. On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote: > Hi all, > > I'm running Spark on Kubernetes on AWS using only spot instances for > executors with dynamic allocation enabled. This particular job is > being > triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had > recently switched to using PersistentVolumeClaims in Spark > with > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > but kept > spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see > under the notes for spark.dynamicAllocation.enabled [2] that these > configurations are "or" not "and". However, when setting > spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the > message > > org.apache.spark.SparkException: Dynamic allocation of executors requires one > of the following conditions: 1) enabling external shuffle > service through spark.shuffle.service.enabled. 2) enabling shuffle tracking > through spark.dynamicAllocation.shuffleTracking.enabled. 3) > enabling shuffle blocks decommission through spark.decommission.enabled and > spark.storage.decommission.shuffleBlocks.enabled. 4) > (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom > ShuffleDataIO who's ShuffleDriverComponents supports reliable > storage. > > Am I hitting this bug unavoidably? Or is there a configuration I'm missing to > enable > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > to replace > spark.dynamicAllocation.shuffleTracking.enabled=true? > > Using Spark 3.5.1 - here's my full spark-defaults.conf just in case > > spark.checkpoint.compress > true > spark.driver.cores >1 > spark.driver.maxResultSize > 2g > spark.driver.memory > 5140m > spark.dynamicAllocation.enabled > true > spark.dynamicAllocation.executorAllocationRatio > 0.33 > spark.dynamicAllocation.maxExecutors > 20 > spark.dynamicAllocation.sustainedSchedulerBacklogTimeout > 30 > spark.eventLog.enabled > true > spark.executor.cores >3 > spark.executor.logs.rolling.enableCompression > true > spark.executor.logs.rolling.maxRetainedFiles > 48 > spark.executor.logs.rolling.strategy > time > spark.executor.logs.rolling.time.interval > hourly > spark.hadoop.fs.s3a.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > spark.hadoop.fs.s3a.connection.ssl.enabled > false > spark.hadoop.fs.s3a.fast.upload > true > spark.kryo.registrationRequired > false > spark.kryo.unsafe > false > spark.kryoserializer.buffer > 1m > spark.kryoserializer.buffer.max > 1g > spark.kubernetes.driver.limit.cores > 750m > spark.kubernetes.driver.ownPersistentVolumeClaim >
Re: Spark Reads from MapR and Write to MinIO fails for few batches
Hello Spark and User, could you please shed some light ? On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo wrote: > Hello Spark and User, > we have a Spark project which is a long running Spark session where it > does below > 1. We are reading from Mapr FS and writing to MapR FS. > 2. Another parallel job which reads from MapR Fs and Writes to MinIO > object storage. > > We are finding issues for a few batches of Spark jobs which one writes to > MinIO , reads empty data frame/dataset from MapR but the job which reads > from & writes to MapR Fs for the same batches never had any issue. > > I was just going through some blogs and stackoverflow to know that Spark > Session which holds both information /config of MapR and Minio sometimes > find this issue as Spark Session or context has no correct information so > either we need to clear or restart spark session for each batch. > > > Please let me know if you have any suggestions to get rid of this issue. > >
Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?
Hi all, I'm running Spark on Kubernetes on AWS using only spot instances for executors with dynamic allocation enabled. This particular job is being triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had recently switched to using PersistentVolumeClaims in Spark with spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO but kept spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see under the notes for spark.dynamicAllocation.enabled [2] that these configurations are "or" not "and". However, when setting spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the message org.apache.spark.SparkException: Dynamic allocation of executors requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled. 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3) enabling shuffle blocks decommission through spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled. 4) (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom ShuffleDataIO who's ShuffleDriverComponents supports reliable storage. Am I hitting this bug unavoidably? Or is there a configuration I'm missing to enable spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO to replace spark.dynamicAllocation.shuffleTracking.enabled=true? Using Spark 3.5.1 - here's my full spark-defaults.conf just in case spark.checkpoint.compress true spark.driver.cores 1 spark.driver.maxResultSize 2g spark.driver.memory 5140m spark.dynamicAllocation.enabled true spark.dynamicAllocation.executorAllocationRatio 0.33 spark.dynamicAllocation.maxExecutors 20 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 30 spark.eventLog.enabled true spark.executor.cores 3 spark.executor.logs.rolling.enableCompression true spark.executor.logs.rolling.maxRetainedFiles 48 spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval hourly spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3a.connection.ssl.enabled false spark.hadoop.fs.s3a.fast.upload true spark.kryo.registrationRequired false spark.kryo.unsafe false spark.kryoserializer.buffer 1m spark.kryoserializer.buffer.max 1g spark.kubernetes.driver.limit.cores 750m spark.kubernetes.driver.ownPersistentVolumeClaim true spark.kubernetes.driver.request.cores 750m spark.kubernetes.driver.reusePersistentVolumeClaim true spark.kubernetes.driver.waitToReusePersistentVolumeClaim true spark.kubernetes.executor.limit.cores 3700m spark.kubernetes.executor.request.cores 3700m spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName OnDemand spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path /data/spark-x/executor-x spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly false spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit 20Gi spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-
Spark Job Fails while writing data to a S3 location in Parquet
I am trying to read .csv.gz file in S3 and write it to S3 in Parquet format through a Spark job. Currently I am using AWS EMR service for this. Spark Job is execute as a step in EMR cluster. For some .csv.gz files I have encounter below issue. I have used both spark 3.4 and 3.5 versions and still getting same error. > > Exception in thread "main" java.lang.Exception: > org.apache.spark.SparkException: Job aborted due to stage failure: Task 63 > in stage 4.0 failed 4 times, most recent failure: Lost task 63.3 in stage > 4.0 (TID 71) (ip-100-68-72-50.880639474967.ase1.aws.dev.r53 executor 4): > org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while > writing rows to s3:///TEMP/. > > at > org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421) > > at > org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100) > > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888) > > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) > > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) > > at org.apache.spark.scheduler.Task.run(Task.scala:141) > > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) > > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > > at java.base/java.lang.Thread.run(Thread.java:840) > > Caused by: java.io.EOFException: Unexpected end of input stream > > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165) > > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) > > at > nl.basjes.hadoop.io.compress.SplittableGzipCodec$SplittableGzipInputStream.read(SplittableGzipCodec.java:468) > > at java.base/java.io.InputStream.read(InputStream.java:218) > > at > org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:132) > > at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227) > > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185) > > at > org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:163) > > at > org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:200) > > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > > at > org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:239) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:404) > > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411) > > ... 15 more Driver stacktrace: > at > com.lseg.lsa.pipeline.core.MIngestionUtilities$.validateInitialDataframe(MIngestionUtilities.scala:205) > at > com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation(DataIngestion.scala:22) > at > com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation$(DataIngestion.scala:21) > at &g
Spark Reads from MapR and Write to MinIO fails for few batches
Hello Spark and User, we have a Spark project which is a long running Spark session where it does below 1. We are reading from Mapr FS and writing to MapR FS. 2. Another parallel job which reads from MapR Fs and Writes to MinIO object storage. We are finding issues for a few batches of Spark jobs which one writes to MinIO , reads empty data frame/dataset from MapR but the job which reads from & writes to MapR Fs for the same batches never had any issue. I was just going through some blogs and stackoverflow to know that Spark Session which holds both information /config of MapR and Minio sometimes find this issue as Spark Session or context has no correct information so either we need to clear or restart spark session for each batch. Please let me know if you have any suggestions to get rid of this issue.
[Spark Connect ] Date Data type formatting issue
Hi all, I am experiencing a date field formatting issue when loading data from an Hive table in Spark via Spark Connect (On AWS EMR cluster) using R sparklyr package. The date field is converted to a char type, where as the same field is loaded as a date type when using our On-Premise Spark with YARN connected to the same On-Premise Hive table. Could you please advise on how to ensure that the date fields are loaded in their native date type format when using Spark Connect? Thanks, Elango
Re: [ANNOUNCE] Apache Spark 3.5.2 released
Thank you, Kent! Kent Yao 于2024年8月12日周一 08:03写道: > We are happy to announce the availability of Apache Spark 3.5.2! > > Spark 3.5.2 is the second maintenance release containing security > and correctness fixes. This release is based on the branch-3.5 > maintenance branch of Spark. We strongly recommend all 3.5 users > to upgrade to this stable release. > > To download Spark 3.5.2, head over to the download page: > https://spark.apache.org/downloads.html > > To view the release notes: > https://spark.apache.org/releases/spark-release-3-5-2.html > > We would like to acknowledge all community members for contributing to this > release. This release would not have been possible without you. > > Kent Yao > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[ANNOUNCE] Apache Spark 3.5.2 released
We are happy to announce the availability of Apache Spark 3.5.2! Spark 3.5.2 is the second maintenance release containing security and correctness fixes. This release is based on the branch-3.5 maintenance branch of Spark. We strongly recommend all 3.5 users to upgrade to this stable release. To download Spark 3.5.2, head over to the download page: https://spark.apache.org/downloads.html To view the release notes: https://spark.apache.org/releases/spark-release-3-5-2.html We would like to acknowledge all community members for contributing to this release. This release would not have been possible without you. Kent Yao - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [spark connect] unable to utilize stand alone cluster
Glad to help! On Tue, 6 Aug, 2024, 17:37 Ilango, wrote: > > Thanks Praboth. Passing —master attr in spark connect command worked like > charm. I am able to submit spark connect to my existing stand-alone cluster > > Thanks for saving my day once again :) > > Thanks, > Elango > > > On Tue, 6 Aug 2024 at 6:08 PM, Prabodh Agarwal > wrote: > >> Do you get some error on passing the master option to your spark connect >> command? >> >> On Tue, 6 Aug, 2024, 15:36 Ilango, wrote: >> >>> >>> >>> >>> Thanks Prabodh. I'm having an issue with the Spark Connect connection as >>> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas >>> the actual master node for our Spark standalone cluster is different. I am >>> passing that master node ip in the Spark Connect Connection. But still it >>> is not set correctly. Could you please help me update this configuration to >>> reflect the correct master node value? >>> >>> >>> >>> This is my spark connect connection >>> >>> >>> >>> spark = SparkSession.builder\ >>> >>> .remote("sc://:15002")\ >>> >>> .getOrCreate() >>> >>> >>> Thanks, >>> Elango >>> >>> >>> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal >>> wrote: >>> >>>> There is an executors tab on spark connect. It's contents are generally >>>> similar to the workers section of the spark master ui. >>>> >>>> You might need to specify --master option in your spark connect command >>>> if you haven't done so yet. >>>> >>>> On Tue, 6 Aug, 2024, 14:19 Ilango, wrote: >>>> >>>>> >>>>> Hi all, >>>>> >>>>> I am evaluating the use of Spark Connect with my Spark stand-alone >>>>> cluster, which has a master node and 3 worker nodes. I have successfully >>>>> created a Spark Connect connection. However, when submitting Spark SQL >>>>> queries, the jobs are being executed only on the master node, and I do not >>>>> observe any executors running on the worker nodes, despite requesting 4 >>>>> executors. >>>>> >>>>> >>>>> >>>>> I would appreciate clarification on whether Spark stand-alone cluster >>>>> is supported for use with Spark Connect. >>>>> >>>>> If so, how can I leverage the existing Spark stand-alone cluster's >>>>> worker nodes? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> Elango >>>>> >>>>
Re: [spark connect] unable to utilize stand alone cluster
Thanks Praboth. Passing —master attr in spark connect command worked like charm. I am able to submit spark connect to my existing stand-alone cluster Thanks for saving my day once again :) Thanks, Elango On Tue, 6 Aug 2024 at 6:08 PM, Prabodh Agarwal wrote: > Do you get some error on passing the master option to your spark connect > command? > > On Tue, 6 Aug, 2024, 15:36 Ilango, wrote: > >> >> >> >> Thanks Prabodh. I'm having an issue with the Spark Connect connection as >> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas >> the actual master node for our Spark standalone cluster is different. I am >> passing that master node ip in the Spark Connect Connection. But still it >> is not set correctly. Could you please help me update this configuration to >> reflect the correct master node value? >> >> >> >> This is my spark connect connection >> >> >> >> spark = SparkSession.builder\ >> >> .remote("sc://:15002")\ >> >> .getOrCreate() >> >> >> Thanks, >> Elango >> >> >> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal >> wrote: >> >>> There is an executors tab on spark connect. It's contents are generally >>> similar to the workers section of the spark master ui. >>> >>> You might need to specify --master option in your spark connect command >>> if you haven't done so yet. >>> >>> On Tue, 6 Aug, 2024, 14:19 Ilango, wrote: >>> >>>> >>>> Hi all, >>>> >>>> I am evaluating the use of Spark Connect with my Spark stand-alone >>>> cluster, which has a master node and 3 worker nodes. I have successfully >>>> created a Spark Connect connection. However, when submitting Spark SQL >>>> queries, the jobs are being executed only on the master node, and I do not >>>> observe any executors running on the worker nodes, despite requesting 4 >>>> executors. >>>> >>>> >>>> >>>> I would appreciate clarification on whether Spark stand-alone cluster >>>> is supported for use with Spark Connect. >>>> >>>> If so, how can I leverage the existing Spark stand-alone cluster's >>>> worker nodes? >>>> >>>> >>>> >>>> >>>> >>>> >>>> Thanks, >>>> Elango >>>> >>>
Re: Spark 3.5.0 bug - Writing a small paraquet dataframe to storage using spark 3.5.0 taking too long
Hi Spark community, Any resolution would be highly appreciated. Few additional analysis from my side: The lag in writing parquet exists in spark 3.5.0, but no lag in spark 3.1.2 or 2.4.5. Also, I found out that the task WholeStageCodeGen(1) --> ColumnarToRow is the one which is taking the most time (almost 3 mins for a simple 3 mb file) in spark 3.5.0. Input batch size of this stage is 10,and output record count is 30,000. The same CoumnarToRow task in spark 3.1.2 finishes in 10 secs. Further, with spark 3.5.0 if I cache the dataframe and materialise it using df.count() and then write the df into parquet file, then the ColumnarToRow gets called twice, first takes 10 secs and second one 3 mins. On Wed, 31 Jul, 2024, 10:14 PM Bijoy Deb, wrote: > Hi, > > We are using Spark on-premise to simply read a parquet file from > GCS(Google Cloud storage) into the DataFrame and write the DataFrame into > another folder in parquet format in GCS, using below code: > > > > DFS_BLOCKSIZE = 512 * 1024 * 1024 > > > spark = SparkSession.builder \ > .appName("test_app_parquet_load") \ > .config("spark.master", "spark://spark-master-svc:7077") \ > .config("spark.driver.maxResultSize", '1g') \ > .config("spark.driver.memory", '1g') \ > .config("spark.executor.cores",4) \ > .config("spark.sql.shuffle.partitions", 16) \ >.config("spark.sql.files.maxPartitionBytes", DFS_BLOCKSIZE) \ > > .getOrCreate() > > > folder="gs://input_folder/input1/key=20240610" > print(f"reading parquet from {folder}") > > start_time1 = time.time() > > data_df = spark.read.parquet(folder) > > end_time1 = time.time() > print(f"Time duration for reading parquet t1: {end_time1 - start_time1}") > > > start_time2 = time.time() > > data_df.write.mode("overwrite").parquet("gs://output_folder/output/key=20240610") > > end_time2 = time.time() > print(f"Time duration for writing parquet t3: {end_time2 - start_time2}") > > spark.stop() > > > > > > > > > ______ > > > However, we observed a drastic time difference between Spark 2.4.5 and > 3.5.0 in the writing process.Even in case of local filesystem instead of > gcs, spark 3.5.0 is taking long time. > > In Spark 2.4.5, the above code takes about 10 seconds for Parquet to read > and 20 seconds for write, while in Spark 3.5.0 read takes almost similar > time but write takes nearly 3 minutes. The size of the file is just 3 MB. > Further, we have noticed that if we read a CSV file instead of parquet into > DataFrame and write to another folder in parquet format, Spark 3.5.0 takes > relatively less time to write, about 30-40 seconds. > > So, it looks like only reading a parquet file to a dataframe and writing > that dataframe to another parquet file is taking too long in the case of > Spark 3.5.0. > > We are seeing that there is no slowness even with Spark 3.1.2. So, it > seems that the issue with spark job taking too long to write a parquet > based dataframe into another parquet file (in gcs or local filesystem both) > is specific to spark 3.5.0. Looks to be either a potential bug in Spark > 3.5.0 or some parquet related configuration that is not clearly documented. > Any help in this regard would be highly appreciated. > > > Thanks, > > Bijoy >
Re: [spark connect] unable to utilize stand alone cluster
Do you get some error on passing the master option to your spark connect command? On Tue, 6 Aug, 2024, 15:36 Ilango, wrote: > > > > Thanks Prabodh. I'm having an issue with the Spark Connect connection as > the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas > the actual master node for our Spark standalone cluster is different. I am > passing that master node ip in the Spark Connect Connection. But still it > is not set correctly. Could you please help me update this configuration to > reflect the correct master node value? > > > > This is my spark connect connection > > > > spark = SparkSession.builder\ > > .remote("sc://:15002")\ > > .getOrCreate() > > > Thanks, > Elango > > > On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal > wrote: > >> There is an executors tab on spark connect. It's contents are generally >> similar to the workers section of the spark master ui. >> >> You might need to specify --master option in your spark connect command >> if you haven't done so yet. >> >> On Tue, 6 Aug, 2024, 14:19 Ilango, wrote: >> >>> >>> Hi all, >>> >>> I am evaluating the use of Spark Connect with my Spark stand-alone >>> cluster, which has a master node and 3 worker nodes. I have successfully >>> created a Spark Connect connection. However, when submitting Spark SQL >>> queries, the jobs are being executed only on the master node, and I do not >>> observe any executors running on the worker nodes, despite requesting 4 >>> executors. >>> >>> >>> >>> I would appreciate clarification on whether Spark stand-alone cluster is >>> supported for use with Spark Connect. >>> >>> If so, how can I leverage the existing Spark stand-alone cluster's >>> worker nodes? >>> >>> >>> >>> >>> >>> >>> Thanks, >>> Elango >>> >>
Re: [spark connect] unable to utilize stand alone cluster
Thanks Prabodh. I'm having an issue with the Spark Connect connection as the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas the actual master node for our Spark standalone cluster is different. I am passing that master node ip in the Spark Connect Connection. But still it is not set correctly. Could you please help me update this configuration to reflect the correct master node value? This is my spark connect connection spark = SparkSession.builder\ .remote("sc://:15002")\ .getOrCreate() Thanks, Elango On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal wrote: > There is an executors tab on spark connect. It's contents are generally > similar to the workers section of the spark master ui. > > You might need to specify --master option in your spark connect command if > you haven't done so yet. > > On Tue, 6 Aug, 2024, 14:19 Ilango, wrote: > >> >> Hi all, >> >> I am evaluating the use of Spark Connect with my Spark stand-alone >> cluster, which has a master node and 3 worker nodes. I have successfully >> created a Spark Connect connection. However, when submitting Spark SQL >> queries, the jobs are being executed only on the master node, and I do not >> observe any executors running on the worker nodes, despite requesting 4 >> executors. >> >> >> >> I would appreciate clarification on whether Spark stand-alone cluster is >> supported for use with Spark Connect. >> >> If so, how can I leverage the existing Spark stand-alone cluster's worker >> nodes? >> >> >> >> >> >> >> Thanks, >> Elango >> >
Re: [spark connect] unable to utilize stand alone cluster
There is an executors tab on spark connect. It's contents are generally similar to the workers section of the spark master ui. You might need to specify --master option in your spark connect command if you haven't done so yet. On Tue, 6 Aug, 2024, 14:19 Ilango, wrote: > > Hi all, > > I am evaluating the use of Spark Connect with my Spark stand-alone > cluster, which has a master node and 3 worker nodes. I have successfully > created a Spark Connect connection. However, when submitting Spark SQL > queries, the jobs are being executed only on the master node, and I do not > observe any executors running on the worker nodes, despite requesting 4 > executors. > > > > I would appreciate clarification on whether Spark stand-alone cluster is > supported for use with Spark Connect. > > If so, how can I leverage the existing Spark stand-alone cluster's worker > nodes? > > > > > > > Thanks, > Elango >
[spark connect] unable to utilize stand alone cluster
Hi all, I am evaluating the use of Spark Connect with my Spark stand-alone cluster, which has a master node and 3 worker nodes. I have successfully created a Spark Connect connection. However, when submitting Spark SQL queries, the jobs are being executed only on the master node, and I do not observe any executors running on the worker nodes, despite requesting 4 executors. I would appreciate clarification on whether Spark stand-alone cluster is supported for use with Spark Connect. If so, how can I leverage the existing Spark stand-alone cluster's worker nodes? Thanks, Elango
Spark History CORS header ‘Access-Control-Allow-Origin’ missing
Hello, I'm trying to acces Spark History UI through Apache Knox proxy but I get this error the following error: Cross-Origin Request Blocked: The Same Origin Policy disallows reading the remote resource at [ https://ithdpdev-ekleszcz01.cern.ch:18080/api/v1/applications?limit=2147483647&status=completed | https://:18080/api/v1/applications?limit=2147483647&status=completed ] . (Reason: CORS header ‘Access-Control-Allow-Origin’ missing). Status code: 302. I can' t find something related to CORS in Spark do you know how I could fix that ? Someone suggested the following parameters but it didn't work and I can't manage to find any reference to those online: spark.hadoop.http.cross-origin.allowed-origins = * spark.hadoop.http.cross-origin.allowed-methods = GET, PUT, POST, OPTIONS, HEAD, DELETE spark.hadoop.http.cross-origin.allowed-headers = X-Requested-With, Content-Type, Accept, Origin, WWW-Authenticate, Accept-Encoding, Transfer-Encoding spark.hadoop.http.cross-origin.max-age = 180 Thanks for your answer
Re: [Issue] Spark SQL - broadcast failure
Hi all, Do we have any idea on this. Thanks On Tue, 23 Jul, 2024, 12:54 pm Sudharshan V, wrote: > We removed the explicit broadcast for that particular table and it took > longer time since the join type changed from BHJ to SMJ. > > I wanted to understand how I can find what went wrong with the broadcast > now. > How do I know the size of the table inside of spark memory. > > I have tried to cache the table hoping I could see the table size in the > storage tab of spark UI of EMR. > > But I see no data there . > > Thanks > > On Tue, 23 Jul, 2024, 12:48 pm Sudharshan V, > wrote: > >> Hi all, apologies for the delayed response. >> >> We are using spark version 3.4.1 in jar and EMR 6.11 runtime. >> >> We have disabled the auto broadcast always and would broadcast the >> smaller tables using explicit broadcast. >> >> It was working fine historically and only now it is failing. >> >> The data sizes I mentioned was taken from S3. >> >> Thanks, >> Sudharshan >> >> On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, >> wrote: >> >>> Can you try disabling broadcast join and see what happens? >>> >>> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V >>> wrote: >>> >>>> Hi all, >>>> >>>> Been facing a weird issue lately. >>>> In our production code base , we have an explicit broadcast for a small >>>> table. >>>> It is just a look up table that is around 1gb in size in s3 and just >>>> had few million records and 5 columns. >>>> >>>> The ETL was running fine , but with no change from the codebase nor the >>>> infrastructure, we are getting broadcast failures. Even weird fact is the >>>> older size of the data is 1.4gb while for the new run is just 900 MB >>>> >>>> Below is the error message >>>> Cannot broadcast table that is larger than 8 GB : 8GB. >>>> >>>> I find it extremely weird considering that the data size is very well >>>> under the thresholds. >>>> >>>> Are there any other ways to find what could be the issue and how we can >>>> rectify this issue? >>>> >>>> Could the data characteristics be an issue? >>>> >>>> Any help would be immensely appreciated. >>>> >>>> Thanks >>>> >>>
A code change for spark ui in Sql tab
Hi, Community I'm a new onboarder in the Spark community and find some lag between Spark and DBR in Spark UI. This is in DBR for cost based optimizer in Spark UI: https://docs.databricks.com/en/optimizations/cbo.html#spark-sql-ui. To implement similar thing in open source part, I've implmented the thing in https://github.com/apache/spark/pull/47534. I've searched in jira forum, but got nothing useful. So I make this email to confirm whether such functionality in open source is supported and what I can do if I want to make this change checkin. Thanks, Weihan Tang
Re: [Spark Connect] connection issue
Glad it worked! On Tue, 30 Jul, 2024, 11:12 Ilango, wrote: > > Thanks Prabodh. I copied the spark connect jar to $SPARK_HOME/jars > folder. And passed the location as —jars attr. Its working now. I could > submit spark jobs via spark connect. > > Really appreciate the help. > > > > Thanks, > Elango > > > On Tue, 30 Jul 2024 at 11:05 AM, Prabodh Agarwal > wrote: > >> Yeah. I understand the problem. One of the ways is to actually place the >> spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark >> connect. Using the `--packages` or the `--jars` option is flaky in case of >> spark connect. >> >> You can instead manually place the relevant spark connect jar file in the >> `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars` >> option from your start command. >> >> On Mon, Jul 29, 2024 at 7:01 PM Ilango wrote: >> >>> >>> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs >>> path. It seems like the spark connect dependency issue. My spark node is >>> air gapped node so no internet is allowed. Can I download the spark connect >>> jar and pom files locally and share the local paths? How can I share the >>> local jars ? >>> >>> Error message: >>> >>> :: problems summary :: >>> >>> WARNINGS >>> >>> module not found: >>> org.apache.spark#spark-connect_2.12;3.5.1 >>> >>> >>> >>> local-m2-cache: tried >>> >>> >>> >>> >>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >>> >>> >>> >>> -- artifact >>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >>> >>> >>> >>> >>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar >>> >>> >>> >>> local-ivy-cache: tried >>> >>> >>> >>> >>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml >>> >>> >>> >>> -- artifact >>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >>> >>> >>> >>> >>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar >>> >>> >>> >>> central: tried >>> >>> >>> >>> >>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >>> >>> >>> >>> -- artifact >>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >>> >>> >>> >>> >>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar >>> >>> >>> >>> spark-packages: tried >>> >>> >>> >>> >>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >>> >>> >>> >>> -- artifact >>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >>> >>> >>> >>> >>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar >>> >>> >>> >>> :::::: >>> >>> >>> >>> :: UNRESOLVED DEPENDENCIES :: >>> >>> >>> >>> :::::: >>> >>> >>> >>> :: org.apache.spark#spark-connect_2.12;3.5.1: not found >>> >>> >>> >>> :: >>> >>> >>> >>> >>> >>> ERRORS >>> >>> Server access error at url >>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >>> (java.net.ConnectException: >>> Connection timed out (Connection timed out)) >>> >>> >>> >>> Server access error at url >>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-co
Re: [Spark Connect] connection issue
Thanks Prabodh. I copied the spark connect jar to $SPARK_HOME/jars folder. And passed the location as —jars attr. Its working now. I could submit spark jobs via spark connect. Really appreciate the help. Thanks, Elango On Tue, 30 Jul 2024 at 11:05 AM, Prabodh Agarwal wrote: > Yeah. I understand the problem. One of the ways is to actually place the > spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark > connect. Using the `--packages` or the `--jars` option is flaky in case of > spark connect. > > You can instead manually place the relevant spark connect jar file in the > `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars` > option from your start command. > > On Mon, Jul 29, 2024 at 7:01 PM Ilango wrote: > >> >> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs >> path. It seems like the spark connect dependency issue. My spark node is >> air gapped node so no internet is allowed. Can I download the spark connect >> jar and pom files locally and share the local paths? How can I share the >> local jars ? >> >> Error message: >> >> :: problems summary :: >> >> WARNINGS >> >> module not found: >> org.apache.spark#spark-connect_2.12;3.5.1 >> >> >> >> local-m2-cache: tried >> >> >> >> >> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >> >> >> >> -- artifact >> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >> >> >> >> >> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar >> >> >> >> local-ivy-cache: tried >> >> >> >> >> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml >> >> >> >> -- artifact >> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >> >> >> >> >> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar >> >> >> >> central: tried >> >> >> >> >> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >> >> >> >> -- artifact >> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >> >> >> >> >> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar >> >> >> >> spark-packages: tried >> >> >> >> >> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >> >> >> >> -- artifact >> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: >> >> >> >> >> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar >> >> >> >> :: >> >> >> >> :: UNRESOLVED DEPENDENCIES :: >> >> >> >> :: >> >> >> >> :: org.apache.spark#spark-connect_2.12;3.5.1: not found >> >> >> >> :: >> >> >> >> >> >> ERRORS >> >> Server access error at url >> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >> (java.net.ConnectException: >> Connection timed out (Connection timed out)) >> >> >> >> Server access error at url >> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException: >> Connection timed out (Connection timed out)) >> >> >> >> Server access error at url >> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom >> (java.net.ConnectException: >> Connection timed out (Connection timed out)) >> >> >> >> Server access error at url >> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException: >> Connection timed out (Connection timed out)) >> >> >> >> >> >
Re: Question about installing Apache Spark [PySpark] computer requirements
You probably have to increase jvm/jdk memory size https://stackoverflow.com/questions/1565388/increase-heap-size-in-java On Mon, Jul 29, 2024 at 9:36 PM mike Jadoo wrote: > Thanks. I just downloaded the corretto but I got this error message, > which was the same as before. [It was shared with me that this saying that > I have limited resources, i think] > > ---Py4JJavaError > Traceback (most recent call last) > Cell In[3], line 13 8 squared_rdd = rdd.map(lambda x: x * x) 10 # > Persist the DataFrame in memory 11 > #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results > into a list---> 13 result = squared_rdd.collect() 15 # Print the result > 16 print(result) > > File > C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, > in RDD.collect(self) 1831 with SCCallSiteSync(self.context): 1832 > assert self.ctx._jvm is not None-> 1833 sock_info = > self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 1834 return > list(_load_from_socket(sock_info, self._jrdd_deserializer)) > > File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in > JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ > 1317 self.command_header +\ 1318 args_command +\ 1319 > proto.END_COMMAND_PART 1321 answer = > self.gateway_client.send_command(command)-> 1322 return_value = > get_return_value( 1323 answer, self.gateway_client, self.target_id, > self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, > "_detach"): > > File > C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179, > in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, > **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180 > except Py4JJavaError as e:181 converted = > convert_exception(e.java_exception) > > File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in > get_return_value(answer, gateway_client, target_id, name)324 value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == > REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error > occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", > name), value)329 else:330 raise Py4JError(331 "An > error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 > format(target_id, ".", name, value)) > Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 > in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 > (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: > Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": > CreateProcess error=5, Access is denied > at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128) > at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071) > at > org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181) > at > org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109) > at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124) > at > org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) > at org.apache.spark.scheduler.Task.run(Task.scala:141) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) > at > org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) > at > org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(T
Re: [Spark Connect] connection issue
Yeah. I understand the problem. One of the ways is to actually place the spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark connect. Using the `--packages` or the `--jars` option is flaky in case of spark connect. You can instead manually place the relevant spark connect jar file in the `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars` option from your start command. On Mon, Jul 29, 2024 at 7:01 PM Ilango wrote: > > Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs > path. It seems like the spark connect dependency issue. My spark node is > air gapped node so no internet is allowed. Can I download the spark connect > jar and pom files locally and share the local paths? How can I share the > local jars ? > > Error message: > > :: problems summary :: > > WARNINGS > > module not found: org.apache.spark#spark-connect_2.12;3.5.1 > > > > local-m2-cache: tried > > > > > file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom > > > > -- artifact > org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: > > > > > file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar > > > > local-ivy-cache: tried > > > > > /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml > > > > -- artifact > org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: > > > > > /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar > > > > central: tried > > > > > https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom > > > > -- artifact > org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: > > > > > https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar > > > > spark-packages: tried > > > > > https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom > > > > -- artifact > org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: > > > > > https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar > > > > :: > > > > :: UNRESOLVED DEPENDENCIES :: > > > > :::::: > > > > :: org.apache.spark#spark-connect_2.12;3.5.1: not found > > > > :: > > > > > > :::: ERRORS > > Server access error at url > https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom > (java.net.ConnectException: > Connection timed out (Connection timed out)) > > > > Server access error at url > https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException: > Connection timed out (Connection timed out)) > > > > Server access error at url > https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom > (java.net.ConnectException: > Connection timed out (Connection timed out)) > > > > Server access error at url > https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException: > Connection timed out (Connection timed out)) > > > > > > :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS > > Exception in thread "main" java.lang.RuntimeException: [unresolved > dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found] > > at > org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608) > > at > org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185) > > at > org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334) > > at org.apache.spark.deploy.SparkSubmit.org > $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964) > > at > org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) > > at > org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) > > at > org.apache.spark.deploy.Spark
Re: Question about installing Apache Spark [PySpark] computer requirements
Hi Mike, This appears to be an access issue on Windows + Python. Can you try setting up the PYTHON_PATH environment variable as described in this stackoverflow post https://stackoverflow.com/questions/60414394/createprocess-error-5-access-is-denied-pyspark - Sadha On Mon, Jul 29, 2024 at 3:39 PM mike Jadoo wrote: > Thanks. I just downloaded the corretto but I got this error message, > which was the same as before. [It was shared with me that this saying that > I have limited resources, i think] > > ---Py4JJavaError > Traceback (most recent call last) > Cell In[3], line 13 8 squared_rdd = rdd.map(lambda x: x * x) 10 # > Persist the DataFrame in memory 11 > #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results > into a list---> 13 result = squared_rdd.collect() 15 # Print the result > 16 print(result) > > File > C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, > in RDD.collect(self) 1831 with SCCallSiteSync(self.context): 1832 > assert self.ctx._jvm is not None-> 1833 sock_info = > self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 1834 return > list(_load_from_socket(sock_info, self._jrdd_deserializer)) > > File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in > JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ > 1317 self.command_header +\ 1318 args_command +\ 1319 > proto.END_COMMAND_PART 1321 answer = > self.gateway_client.send_command(command)-> 1322 return_value = > get_return_value( 1323 answer, self.gateway_client, self.target_id, > self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, > "_detach"): > > File > C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179, > in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, > **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180 > except Py4JJavaError as e:181 converted = > convert_exception(e.java_exception) > > File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in > get_return_value(answer, gateway_client, target_id, name)324 value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == > REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error > occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", > name), value)329 else:330 raise Py4JError(331 "An > error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 > format(target_id, ".", name, value)) > Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 > in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 > (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: > Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": > CreateProcess error=5, Access is denied > at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128) > at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071) > at > org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181) > at > org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109) > at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124) > at > org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174) > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) > at org.apache.spark.scheduler.Task.run(Task.scala:141) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) > at > org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) > at > org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) > at > java.base/java.util.concurrent.
Re: Question about installing Apache Spark [PySpark] computer requirements
Thanks. I just downloaded the corretto but I got this error message, which was the same as before. [It was shared with me that this saying that I have limited resources, i think] ---Py4JJavaError Traceback (most recent call last) Cell In[3], line 13 8 squared_rdd = rdd.map(lambda x: x * x) 10 # Persist the DataFrame in memory 11 #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results into a list---> 13 result = squared_rdd.collect() 15 # Print the result 16 print(result) File C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, in RDD.collect(self) 1831 with SCCallSiteSync(self.context): 1832 assert self.ctx._jvm is not None-> 1833 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 1834 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command)-> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180 except Py4JJavaError as e:181 converted = convert_exception(e.java_exception) File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", name), value)329 else:330 raise Py4JError(331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": CreateProcess error=5, Access is denied at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128) at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: CreateProcess error=5, Access is denied at java.base/java.lang.ProcessImpl.create(Native Method) at java.base/java.lang.ProcessImpl.(ProcessImpl.java:492) at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:153) at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107) ... 19 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2
Re: Question about installing Apache Spark [PySpark] computer requirements
Hi Mike, I'm not sure about the minimum requirements of a machine for running Spark. But to run some Pyspark scripts (and Jupiter notbebooks) on a local machine, I found the following steps are the easiest. I installed Amazon corretto and updated the java_home variable as instructed here https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html (Any other java works too, I'm used to corretto from work). Then installed the Pyspark module using pip, which enabled me run Pyspark on my machine. -Sadha On Mon, Jul 29, 2024, 12:51 PM mike Jadoo wrote: > Hello, > > I am trying to run Pyspark on my computer without success. I follow > several different directions from online sources and it appears that I need > to get a faster computer. > > I wanted to ask what are some recommendations for computer specifications > to run PySpark (Apache Spark). > > Any help would be greatly appreciated. > > Thank you, > > Mike >
Re: [Spark Connect] connection issue
Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs path. It seems like the spark connect dependency issue. My spark node is air gapped node so no internet is allowed. Can I download the spark connect jar and pom files locally and share the local paths? How can I share the local jars ? Error message: :: problems summary :: WARNINGS module not found: org.apache.spark#spark-connect_2.12;3.5.1 local-m2-cache: tried file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom -- artifact org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar local-ivy-cache: tried /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml -- artifact org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar central: tried https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom -- artifact org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar spark-packages: tried https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom -- artifact org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar: https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar :: :: UNRESOLVED DEPENDENCIES :: :: :: org.apache.spark#spark-connect_2.12;3.5.1: not found :: ERRORS Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom (java.net.ConnectException: Connection timed out (Connection timed out)) Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException: Connection timed out (Connection timed out)) Server access error at url https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom (java.net.ConnectException: Connection timed out (Connection timed out)) Server access error at url https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException: Connection timed out (Connection timed out)) :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608) at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185) at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334) at org.apache.spark.deploy.SparkSubmit.org $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Elango On Mon, 29 Jul 2024 at 6:45 PM, Prabodh Agarwal wrote: > The spark connect startup prints the log location. Is that not feasible > for you? > For me log comes to $SPARK_HOME/logs > > On Mon, 29 Jul, 2024, 15:30 Ilango, wrote: > >> >> Hi all, >> >> >> I am facing issues with a Spark Connect application running on a Spark >> standalone cluster (without YARN and HDFS). After executing the >> start-connect-server.sh script with the specified packages, I observe a >> process ID for a short period but am unable to see the corresponding port >> (default 15002) associated with that PID. The process automatically stops >> after around 10 minutes. >> >> Since the Spark History server is not enabled, I am unable to locate the >> relevant logs or error messages. The logs for currently running Spark >> applicatio
Question about installing Apache Spark [PySpark] computer requirements
Hello, I am trying to run Pyspark on my computer without success. I follow several different directions from online sources and it appears that I need to get a faster computer. I wanted to ask what are some recommendations for computer specifications to run PySpark (Apache Spark). Any help would be greatly appreciated. Thank you, Mike
Re: [Spark Connect] connection issue
The spark connect startup prints the log location. Is that not feasible for you? For me log comes to $SPARK_HOME/logs On Mon, 29 Jul, 2024, 15:30 Ilango, wrote: > > Hi all, > > > I am facing issues with a Spark Connect application running on a Spark > standalone cluster (without YARN and HDFS). After executing the > start-connect-server.sh script with the specified packages, I observe a > process ID for a short period but am unable to see the corresponding port > (default 15002) associated with that PID. The process automatically stops > after around 10 minutes. > > Since the Spark History server is not enabled, I am unable to locate the > relevant logs or error messages. The logs for currently running Spark > applications are accessible from the Spark UI, but I am unsure where to > find the logs for the Spark Connect application and service. > > Could you please advise on where to find the logs or error messages > related to Spark Connect? > > > > > Thanks, > Elango >
[Spark Connect] connection issue
Hi all, I am facing issues with a Spark Connect application running on a Spark standalone cluster (without YARN and HDFS). After executing the start-connect-server.sh script with the specified packages, I observe a process ID for a short period but am unable to see the corresponding port (default 15002) associated with that PID. The process automatically stops after around 10 minutes. Since the Spark History server is not enabled, I am unable to locate the relevant logs or error messages. The logs for currently running Spark applications are accessible from the Spark UI, but I am unsure where to find the logs for the Spark Connect application and service. Could you please advise on where to find the logs or error messages related to Spark Connect? Thanks, Elango
Re: [Issue] Spark SQL - broadcast failure
We removed the explicit broadcast for that particular table and it took longer time since the join type changed from BHJ to SMJ. I wanted to understand how I can find what went wrong with the broadcast now. How do I know the size of the table inside of spark memory. I have tried to cache the table hoping I could see the table size in the storage tab of spark UI of EMR. But I see no data there . Thanks On Tue, 23 Jul, 2024, 12:48 pm Sudharshan V, wrote: > Hi all, apologies for the delayed response. > > We are using spark version 3.4.1 in jar and EMR 6.11 runtime. > > We have disabled the auto broadcast always and would broadcast the smaller > tables using explicit broadcast. > > It was working fine historically and only now it is failing. > > The data sizes I mentioned was taken from S3. > > Thanks, > Sudharshan > > On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, > wrote: > >> Can you try disabling broadcast join and see what happens? >> >> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V >> wrote: >> >>> Hi all, >>> >>> Been facing a weird issue lately. >>> In our production code base , we have an explicit broadcast for a small >>> table. >>> It is just a look up table that is around 1gb in size in s3 and just had >>> few million records and 5 columns. >>> >>> The ETL was running fine , but with no change from the codebase nor the >>> infrastructure, we are getting broadcast failures. Even weird fact is the >>> older size of the data is 1.4gb while for the new run is just 900 MB >>> >>> Below is the error message >>> Cannot broadcast table that is larger than 8 GB : 8GB. >>> >>> I find it extremely weird considering that the data size is very well >>> under the thresholds. >>> >>> Are there any other ways to find what could be the issue and how we can >>> rectify this issue? >>> >>> Could the data characteristics be an issue? >>> >>> Any help would be immensely appreciated. >>> >>> Thanks >>> >>
Re: [Issue] Spark SQL - broadcast failure
Hi all, apologies for the delayed response. We are using spark version 3.4.1 in jar and EMR 6.11 runtime. We have disabled the auto broadcast always and would broadcast the smaller tables using explicit broadcast. It was working fine historically and only now it is failing. The data sizes I mentioned was taken from S3. Thanks, Sudharshan On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, wrote: > Can you try disabling broadcast join and see what happens? > > On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V > wrote: > >> Hi all, >> >> Been facing a weird issue lately. >> In our production code base , we have an explicit broadcast for a small >> table. >> It is just a look up table that is around 1gb in size in s3 and just had >> few million records and 5 columns. >> >> The ETL was running fine , but with no change from the codebase nor the >> infrastructure, we are getting broadcast failures. Even weird fact is the >> older size of the data is 1.4gb while for the new run is just 900 MB >> >> Below is the error message >> Cannot broadcast table that is larger than 8 GB : 8GB. >> >> I find it extremely weird considering that the data size is very well >> under the thresholds. >> >> Are there any other ways to find what could be the issue and how we can >> rectify this issue? >> >> Could the data characteristics be an issue? >> >> Any help would be immensely appreciated. >> >> Thanks >> >
[Spark SQL]: Why the OptimizeSkewedJoin rule does not optimize FullOuterJoin?
Hi, I am a beginner in Spark and currently learning the Spark source code. I have a question about the AQE rule OptimizeSkewedJoin. I have a SQL query using SMJ FullOuterJoin, where there is read skew on the left side (the case is mentioned below). case: remote bytes read total (min, med, max) 90.5 GiB [bytes:97189264140] (208.5 MiB [bytes:218673776], 210.0 MiB [bytes:220191607], 18.1 GiB [bytes:19467332173]) However, the OptimizeSkewedJoin rule does not optimize FullOuterJoin. I would like to know the reason behind this. Thanks.
[spark connect] issue in testing spark connect
Hi all, I am currently using a Spark standalone cluster, which is functioning as expected. Users are able to connect to the cluster and submit jobs without any issues. I am also testing the Spark Connect capability, which will enable external clients to submit jobs to the cluster. To start the Spark Connect server, I am running the command `/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1` on the Spark master node. The command executes without any errors, suggesting that the Spark Connect server is running successfully. However, I am unable to access the Spark UI at to verify the Spark Connect server's status. Can someone please provide guidance on how to confirm that Spark Connect is functioning properly? Additionally, when attempting to run a Spark code snippet in Jupyter Notebook to test Spark Connect, I encounter the following error. If someone is familiar with this issue or could provide assistance in resolving it, I would greatly appreciate it. import pyspark import pandas import pyarrow import grpc_status import grpc import torch from pyspark.sql import SparkSession import os os.environ["SPARK_HOME"] = "/path/to/my/standalone/pyspark/cluster" spark = SparkSession.builder.remote("sc://").getOrCreate() #spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate() Error : /pyspark/sql/connect/session.py:185: UserWarning: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses; last error: UNKNOWN: ipv4::15002: Failed to connect to remote host: Connection refused" debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: ipv4::15002: Failed to connect to remote host: Connection refused", grpc_status:14, created_time:"2024-07-19T14:48:49.12739279+08:00"}" warnings.warn(str(e)) Thanks, Elango
Error on Spark history UI api when used with Apache Knox
Hello, I am securing my Spark history UI with Apache knox and am having trouble with the proxied SparkhistoryUI as mentionned in this thread: https://lists.apache.org/thread/xwzo4g6qk2hqmn62t6p162b8t6l3hldq I managed to edit the rewrite rules to make the css load on the proxied sparkhistory UI but I am now facing another problem I don' t manage to overcome. When the pages loads the js code triggers a jquery call to the api: `$.getJSON(uiRoot + "/api/v1/applications", appParams` The problem is that when Knox is proxying Spark history UI this uiRoot is not the right one it's https://knox-host:8443/api/v1/applications instead of https://spark-ui-host:18080/api/v1/applications Apache Knox rewrite rules don't seem to be able to fix that so this is why I am sending a message here to see if any of you have a better idea, maybe something related to the proxy setup. I tried to set `spark.ui.proxyBase` to the Spark History UI URL and port so that Knox tries to reach it but I end up with a 302 error `Cross-Origin Request Blocked: The Same Origin Policy disallows reading the remote resource at https://host:18080/api/v1/applications?limit=2147483647&status=completed. (Reason: CORS header ‘Access-Control-Allow-Origin’ missing). Status code: 302.`, and I don' t see anything related to CORS in the SHS configuration. I would love to have any suggestion on why the Spark rules don't work out of the box on Knox or how I could fix that api call. Thank you very much for your responses - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
problem using spark 3.4 with spots
Hello, Hello, I am working with spark on dataiku using spots on S3 ( not in demand instances). I had no problem until I moved from park 3.3 to spark 3.4 ! I always have this fail and could not understand what configuration in the new version of spark lead to it *org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=2, partition=357) failed; but task commit success, data duplication may happen.* Here is the full log [09:46:42] [INFO] [dku.utils] - [2024/07/17-09:46:42.680] [Thread-6] [ERROR] [org.apache.spark.sql.execution.datasources.FileFormatWriter] - Aborting job 2271622d-848a-4719-ad86-81d951235dbb. [09:46:42] [INFO] [dku.utils] - org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=2, partition=715) failed; but task commit success, data duplication may happen. reason=ExecutorLostFailure(1,false,Some(The executor with id 1 was deleted by a user or the framework.)) [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.17.jar:?] [09:46:42] [INFO] [dku.utils] -at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.17.jar:?] [09:46:42] [INFO] [dku.utils] -at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.17.jar:?] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1(DAGScheduler.scala:1199) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1$adapted(DAGScheduler.scala:1199) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.17.jar:?] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.handleStageFailed(DAGScheduler.scala:1199) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2981) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263) ~[spark-core_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) ~[spark-sql_2.12-3.4.1.jar:3.4.1] [09:46:42] [INFO] [dku.utils] -at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125
Re: [Issue] Spark SQL - broadcast failure
Can you try disabling broadcast join and see what happens? On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V wrote: > Hi all, > > Been facing a weird issue lately. > In our production code base , we have an explicit broadcast for a small > table. > It is just a look up table that is around 1gb in size in s3 and just had > few million records and 5 columns. > > The ETL was running fine , but with no change from the codebase nor the > infrastructure, we are getting broadcast failures. Even weird fact is the > older size of the data is 1.4gb while for the new run is just 900 MB > > Below is the error message > Cannot broadcast table that is larger than 8 GB : 8GB. > > I find it extremely weird considering that the data size is very well > under the thresholds. > > Are there any other ways to find what could be the issue and how we can > rectify this issue? > > Could the data characteristics be an issue? > > Any help would be immensely appreciated. > > Thanks >
Re: [Issue] Spark SQL - broadcast failure
It will help if you mention the Spark version and the piece of problematic code HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 16 Jul 2024 at 08:51, Sudharshan V wrote: > > On Mon, 8 Jul, 2024, 7:53 pm Sudharshan V, > wrote: > >> Hi all, >> >> Been facing a weird issue lately. >> In our production code base , we have an explicit broadcast for a small >> table. >> It is just a look up table that is around 1gb in size in s3 and just had >> few million records and 5 columns. >> >> The ETL was running fine , but with no change from the codebase nor the >> infrastructure, we are getting broadcast failures. Even weird fact is the >> older size of the data is 1.4gb while for the new run is just 900 MB >> >> Below is the error message >> Cannot broadcast table that is larger than 8 GB : 8GB. >> >> I find it extremely weird considering that the data size is very well >> under the thresholds. >> >> Are there any other ways to find what could be the issue and how we can >> rectify this issue? >> >> Could the data characteristics be an issue? >> >> Any help would be immensely appreciated. >> >> Thanks >> >
Re: Help wanted on securing spark with Apache Knox / JWT
You need to use the spark.ui.filters setting on the history server https://spark.apache.org/docs/latest/configuration.html#spark-ui: spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.param.type=org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.param.authentication.provider.url=https:// :8443/gateway/knoxsso/api/v1/websso ...etc On Thu, Jul 11, 2024 at 4:18 PM Thomas Mauran wrote: > Hello, > I am sending this email to the mailing list, to get your help on a problem > that I can't seem to resolve myself. > > I am trying to secure Spark history ui running with Yarn as master using > Apache Knox. > > From the Knox configuration point of view I managed to secure the Spark > service, if I go on https://:8443/gateway/default/spark3history I have to > login using SSO then I get redirected to spark history server web ui which > works as expected. > > But if I directly access Spark without getting logged in I don't get > redirected to Knox login page which is what I would like to have, same as > HDFS and YarnUI. > > From what I see in Spark documentation the webui needs to be protected > using the filter system. I can' t seem to find a filter to protect my Spark > history UI using Knox, I protected both HDFS and Yarn by adding this in > core-site.xml which works fine. > > > hadoop.http.authentication.type > > org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler > > hadoop.http.authentication.authentication.provider.url > > https://:8443/gateway/knoxsso/api/v1/websso > > > > hadoop.http.authentication.public.key.pem > > > Adding those properties allowed me to get redirected to knox host page > when I didn' t login yet. > > I am wondering if you knew how to secure Spark history UI to have the same > behavior. > > Do you know what configuration I am missing to redirect it back to the > Knox gateway login page from the Spark history UI as for the other services > where the JWT token is passed and used for keeping the user session ? > > I tried to play with the filters especially > org.apache.hadoop.security.authentication.server.AuthenticationFilter but > didn' t manage to get anything working, so I don' t even know if this is > the right way to do. > > Thanks for your answer > > -- Adam Binford
Help wanted on securing spark with Apache Knox / JWT
Hello, I am sending this email to the mailing list, to get your help on a problem that I can't seem to resolve myself. I am trying to secure Spark history ui running with Yarn as master using Apache Knox. >From the Knox configuration point of view I managed to secure the Spark >service, if I go on https://:8443/gateway/default/spark3history I have to >login using SSO then I get redirected to spark history server web ui which >works as expected. But if I directly access Spark without getting logged in I don't get redirected to Knox login page which is what I would like to have, same as HDFS and YarnUI. >From what I see in Spark documentation the webui needs to be protected using >the filter system. I can' t seem to find a filter to protect my Spark history >UI using Knox, I protected both HDFS and Yarn by adding this in core-site.xml >which works fine. < property > < name > hadoop.http.authentication.type < value > org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler < property > < name > hadoop.http.authentication.authentication.provider.url < value > https:// < knox-hostname > :8443/gateway/knoxsso/api/v1/websso < property > < name > hadoop.http.authentication.public.key.pem < value > < token > Adding those properties allowed me to get redirected to knox host page when I didn' t login yet. I am wondering if you knew how to secure Spark history UI to have the same behavior. Do you know what configuration I am missing to redirect it back to the Knox gateway login page from the Spark history UI as for the other services where the JWT token is passed and used for keeping the user session ? I tried to play with the filters especially org.apache.hadoop.security.authentication.server.AuthenticationFilter but didn' t manage to get anything working, so I don' t even know if this is the right way to do. Thanks for your answer
[Issue] Spark SQL - broadcast failure
Hi all, Been facing a weird issue lately. In our production code base , we have an explicit broadcast for a small table. It is just a look up table that is around 1gb in size in s3 and just had few million records and 5 columns. The ETL was running fine , but with no change from the codebase nor the infrastructure, we are getting broadcast failures. Even weird fact is the older size of the data is 1.4gb while for the new run is just 900 MB Below is the error message Cannot broadcast table that is larger than 8 GB : 8GB. I find it extremely weird considering that the data size is very well under the thresholds. Are there any other ways to find what could be the issue and how we can rectify this issue? Could the data characteristics be an issue? Any help would be immensely appreciated. Thanks
running snowflake query using spark connect on a standalone cluster
I have configured a spark standalone cluster as follows: ``` # start spark master $SPARK_HOME/sbin/start-master.sh # start 2 spark workers SPARK_WORKER_INSTANCES=2 $SPARK_HOME/sbin/start-worker.sh spark://localhost:7077 # start spark connect $SPARK_HOME/sbin/start-connect-server.sh --properties-file ./connect.properties --master spark://localhost:7077 ``` My properties file is defined as follows: ``` spark.serializer=org.apache.spark.serializer.KryoSerializer spark.plugins=io.dataflint.spark.SparkDataflintPlugin spark.jars.packages org.apache.spark:spark-connect_2.12:3.5.1,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-aws:0.15.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.spark:spark-avro_2.12:3.5.1,software.amazon.awssdk:sso:2.18.40,io.dataflint:spark_2.12:0.2.2,net.snowflake:spark-snowflake_2.12:2.16.0-spark_3.4,net.snowflake:snowflake-jdbc:3.16.1 spark.driver.extraJavaOptions=-verbose:class spark.executor.extraJavaOptions=-verbose:class ``` Now I start my pyspark job which connects with this remote instance and then tries to query the table. The snowflake query is fired correctly. It shows up in my Snowflake query history, but then I start getting failures. ``` 24/07/07 22:37:26 INFO ErrorUtils: Spark Connect error during: execute. UserId: pbd. SessionId: 462444eb-82d3-475b-8dd0-ce35d5047405. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (192.168.29.6 executor 1): java.lang.ClassNotFoundException: net.snowflake.spark.snowflake.io.SnowflakeResultSetPartition at org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:124) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71) at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003) at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: net.snowflake.spark.snowflake.io.SnowflakeResultSetPartition at java.base/java.lang.ClassLoader.findClass(ClassLoader.java:724) at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.java:35) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594) at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.java:40) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) at org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:109) ... 21 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at
How to use spark.connect.Plan from dependency in a custom Spark Connect RelationPlugin?
Hello! I'm trying to create my own Spark Connect Plugin by implementing `org.apache.sql.connect.plugin.RelationPlugin`. My target is spark 4.0; What I did: 1. I added compile time dependency `spark-connect-common_2.13` 2. I added `import public "spark/connect/base.proto";` to my message 3. I added `spark.connect.Plan data = 1;` to my message I can successfully run `mvn generate-sources` but generated code cannot be compiled. I got a lot of errors: - `org.sparkproject.connect.protobuf.Descriptors.FileDescriptor cannot be converted to com.google.protobuf.Descriptors.FileDescriptor` - `type argument org.apache.spark.connect.proto.Plan is not within bounds of type-variable MType` - `org.apache.spark.connect.proto.Plan cannot be converted to com.google.protobuf.MessageLite` I do not use any `LITE` runtime optimization for my proto message. I even try to add explicitly `option java_generate_equals_and_hash = true;` and `option optimize_for=SPEED;` but it did not help. Is there any way to use `spark.connect.Proto` from `spark-connect- common` in custom plugins? Or is there any other way to pass `DataFrame` from client to a plugin that implements `RelationPlugin`? Thanks in advance! - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Does Spark 4.0 add Sparkstreaming SQL
Docking with CDC through Spark Streaming SQL, writing flow processing logic and window functions in SQL, is Spark 4.0 supported 308027...@qq.com
Help Needed: Distributed Logging in Spark Application
Hi, In my Spark application, I need to log data row by row directly from the executors to avoid overwhelming the driver's memory, which is already going places. I am exploring the possibility of implementing a distributed logging strategy where each executor logs its output directly, rather than collecting all data to the driver first. Can you recommend best practices or tools for implementing distributed logging from executors in Spark applications? Thank you very much for your time and consideration. I look forward to your response. Best, ZS
Re: Spark Decommission
Thank Ahmed, thats useful information On Wed, Jun 19, 2024 at 1:36 AM Khaldi, Ahmed wrote: > Hey Rajesh, > > > > Fromm y experience, it’s a stable feature, however you must keep in mind > that it will not guarantee that you will not lose the data that is on the > pods of the nodes getting a spot kill. Once you have a spot a kill, you > have 120s to give the node back to the cloud provider. This is when the > decommission script will start and sometimes 120s is enough to migrate the > shuffle/rdd blocks, and sometimes it’s not. It really depends on your > workload and data at the end. > > > > > > *Best regards,* > > > > *Ahmed Khaldi* > > Solutions Architect > > > > *NetApp Limited.* > > +33617424566 Mobile Phone > > kah...@netapp.com > > > > > > > > *From: *Rajesh Mahindra > *Date: *Tuesday, 18 June 2024 at 23:54 > *To: *user@spark.apache.org > *Subject: *Spark Decommission > > Vous ne recevez pas souvent de courriers de la part de rjshmh...@gmail.com. > Découvrez pourquoi cela est important > <https://aka.ms/LearnAboutSenderIdentification> > > > > *EXTERNAL EMAIL - USE CAUTION when clicking links or attachments * > > > > Hi folks, > > > > I am planning to leverage the "Spark Decommission" feature in production > since our company uses SPOT instances on Kubernetes. I wanted to get a > sense of how stable the feature is for production usage and if any one has > thoughts around trying it out in production, especially in kubernetes > environment. > > > > Thanks, > > Rajesh >
Re: Help in understanding Exchange in Spark UI
OK, I gave an answer in StackOverflow. Happy reading Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 20 Jun 2024 at 17:15, Dhruv Singla wrote: > Hey Team > > I've posted a question of StackOverflow. The link is - > https://stackoverflow.com/questions/78644118/understanding-exchange-in-spark-ui > > I haven't got any responses yet. If possible could you please look into > it? If you need me to write the question in the mailing list, I can do that > as well. > > Thanks & Regards > Dhruv >
Help in understanding Exchange in Spark UI
Hey Team I've posted a question of StackOverflow. The link is - https://stackoverflow.com/questions/78644118/understanding-exchange-in-spark-ui I haven't got any responses yet. If possible could you please look into it? If you need me to write the question in the mailing list, I can do that as well. Thanks & Regards Dhruv
Re: [SPARK-48423] Unable to save ML Pipeline to azure blob storage
Hello Team, I am pinging back on this thread to get a pair of eyes on this issue. Ticket: https://issues.apache.org/jira/browse/SPARK-48423 On Thu, 6 Jun 2024 at 00:19, Chhavi Bansal wrote: > Hello team, > I was exploring on how to save ML pipeline to azure blob storage, but was > setback by an issue where it complains of `fs.azure.account.key` not > being found in the configuration even when I have provided the values in > the pipelineModel.option(key1,value1) field. I considered raising a > ticket on spark https://issues.apache.org/jira/browse/SPARK-48423, where > I describe the entire scenario. I tried debugging the code and found that > this key is being explicitly asked for in the code. The only solution was > to again set it part of spark.conf which could result to a race condition > since we work on multi-tenant architecture. > > > > Since saving to Azure blob storage would be common, Can someone please > guide me if I am missing something in the `.option` clause? > > > > I would be happy to make a contribution to the code if someone can shed > some light on how this could be solved. > > -- > Thanks and Regards, > Chhavi Bansal > -- Thanks and Regards, Chhavi Bansal
Re: Spark Decommission
Hey Rajesh, Fromm y experience, it’s a stable feature, however you must keep in mind that it will not guarantee that you will not lose the data that is on the pods of the nodes getting a spot kill. Once you have a spot a kill, you have 120s to give the node back to the cloud provider. This is when the decommission script will start and sometimes 120s is enough to migrate the shuffle/rdd blocks, and sometimes it’s not. It really depends on your workload and data at the end. Best regards, Ahmed Khaldi Solutions Architect NetApp Limited. +33617424566 Mobile Phone kah...@netapp.com<mailto:pump...@netapp.com> From: Rajesh Mahindra Date: Tuesday, 18 June 2024 at 23:54 To: user@spark.apache.org Subject: Spark Decommission Vous ne recevez pas souvent de courriers de la part de rjshmh...@gmail.com. Découvrez pourquoi cela est important<https://aka.ms/LearnAboutSenderIdentification> EXTERNAL EMAIL - USE CAUTION when clicking links or attachments Hi folks, I am planning to leverage the "Spark Decommission" feature in production since our company uses SPOT instances on Kubernetes. I wanted to get a sense of how stable the feature is for production usage and if any one has thoughts around trying it out in production, especially in kubernetes environment. Thanks, Rajesh
Spark Decommission
Hi folks, I am planning to leverage the "Spark Decommission" feature in production since our company uses SPOT instances on Kubernetes. I wanted to get a sense of how stable the feature is for production usage and if any one has thoughts around trying it out in production, especially in kubernetes environment. Thanks, Rajesh
Re: Update mode in spark structured streaming
Best to qualify your thoughts with an example By using the foreachBatch function combined with the update output mode in Spark Structured Streaming, you can effectively handle and integrate late-arriving data into your aggregations. This approach will allow you to continuously update your aggregated results with both on-time and late data example from pyspark.sql import SparkSession from pyspark.sql.functions import expr, col, window, sum as spark_sum, max as spark_max, current_timestamp # Create Spark session spark = SparkSession.builder.appName("exampleWithRate").getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # Simulate a stream of data with an event time stream = spark.readStream.format("rate").option("rowsPerSecond", 5).load() base_timestamp = current_timestamp() stream = stream.withColumn("event_time", (base_timestamp + (col("value") * 60).cast("interval second")).cast("timestamp")) stream = stream.withColumn("value", col("value") % 10) def process_batch(batch_df, batch_id): # Read current state from an external store (simulated here as a static DataFrame) current_state = spark.createDataFrame( [(1, 10, '2024-06-13 10:00:00')], ["key", "total_value", "max_event_time"] ).withColumn("max_event_time", col("max_event_time").cast("timestamp")) # Perform aggregation including late data handling aggregated_batch = batch_df.groupBy("value").agg( spark_sum("value").alias("total_value"), spark_max("event_time").alias("max_event_time") ) # Merge with current state merged_state = current_state.union(aggregated_batch) # Show the merged state merged_state.show(truncate=False) # Define your streaming query streaming_query = ( stream .withWatermark("event_time", "10 minutes") .writeStream .foreachBatch(process_batch) .outputMode("update") .start() ) # Await termination streaming_query.awaitTermination() and the output +---+---+---+ |key|total_value|max_event_time | +---+---+---+ |1 |10 |2024-06-13 10:00:00| +---+---+---+ +---+---+---+ |key|total_value|max_event_time | +---+---+---+ |1 |10 |2024-06-13 10:00:00| |0 |0 |2024-06-15 16:22:23.642| |8 |8 |2024-06-15 16:20:23.642| |2 |4 |2024-06-15 16:24:23.642| |4 |8 |2024-06-15 16:26:23.642| |9 |9 |2024-06-15 16:21:23.642| |5 |5 |2024-06-15 16:17:23.642| |1 |2 |2024-06-15 16:23:23.642| |3 |6 |2024-06-15 16:25:23.642| |6 |6 |2024-06-15 16:18:23.642| |7 |7 |2024-06-15 16:19:23.642| +---+---+---+ HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 14 Jun 2024 at 20:13, Om Prakash wrote: > Hi Team, > > Hope you all are doing well. I have run into a use case in which I want to > do the aggregation in foreachbatch and use update mode for handling late > data in structured streaming. Will this approach work in effectively > capturing late arriving data in the aggregations? Please help. > > > > Thanking you, > Om Prakash >
Update mode in spark structured streaming
Hi Team, Hope you all are doing well. I have run into a use case in which I want to do the aggregation in foreachbatch and use update mode for handling late data in structured streaming. Will this approach work in effectively capturing late arriving data in the aggregations? Please help. Thanking you, Om Prakash
Re: Re: OOM issue in Spark Driver
In a nutshell, the culprit for the OOM issue in your Spark driver appears to be memory leakage or inefficient memory usage within your application. This could be caused by factors such as: 1. Accumulation of data or objects in memory over time without proper cleanup. 2. Inefficient data processing or transformations leading to excessive memory usage. 3. Long-running tasks or stages that accumulate memory usage. 4. Suboptimal Spark configuration settings, such as insufficient memory allocation for the driver or executors. 5. Check stages and executor tabs in Spark GUI HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD Imperial College London London, United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 11 Jun 2024 at 06:50, Lingzhe Sun wrote: > Hi Kathick, > > That suggests you're not performing stateful operations and therefore > there're no state related metrics. You should consider other aspects that > may cause OOM. > Checking logs will always be a good start. And it would be better if some > colleague of you is familiar with JVM and OOM related issues. > > BS > Lingzhe Sun > > > *From:* Karthick Nk > *Date:* 2024-06-11 13:28 > *To:* Lingzhe Sun > *CC:* Andrzej Zera ; User > *Subject:* Re: Re: OOM issue in Spark Driver > Hi Lingzhe, > > I am able to get the below stats(i.e input rate, process rate, input rows > etc..), but not able to find the exact stats that Andrzej asking (ie. > Aggregated > Number Of Total State Rows), Could you guide me on how do I get those > details for states under structured streaming. > [image: image.png] > > Details: > I am using Databricks runtime version: 13.3 LTS (includes Apache Spark > 3.4.1, Scala 2.12) > Driver and worker type: > [image: image.png] > > > Thanks > > > On Tue, Jun 11, 2024 at 7:34 AM Lingzhe Sun > wrote: > >> Hi Kathick, >> >> I believed that what Andrzej means is that you should check >> Aggregated Number Of Total State Rows >> metirc which you could find in the structured streaming UI tab, which >> indicate the total number of your states, only if you perform stateful >> operations. If that increase indefinitely, you should probably check your >> code logic. >> >> BS >> Lingzhe Sun >> >> >> *From:* Karthick Nk >> *Date:* 2024-06-09 14:45 >> *To:* Andrzej Zera >> *CC:* user >> *Subject:* Re: OOM issue in Spark Driver >> Hi Andrzej, >> >> We are using both driver and workers too, >> Details are as follows >> Driver size:128GB Memory, 64 cores. >> Executors size: 64GB Memory, 32 Cores (Executors 1 to 10 - Autoscale) >> >> Workers memory usage: >> One of the worker memory usage screenshot: >> [image: image.png] >> >> >> State metrics details below: >> [image: image.png] >> [image: image.png] >> >> I am not getting memory-related info from the structure streaming tab, >> Could you help me here? >> >> Please let me know if you need more details. >> >> If possible we can connect once at your time and look into the issue >> which will be more helpful to me. >> >> Thanks >> >> On Sat, Jun 8, 2024 at 2:41 PM Andrzej Zera >> wrote: >> >>> Hey, do you perform stateful operations? Maybe your state is growing >>> indefinitely - a screenshot with state metrics would help (you can find it >>> in Spark UI -> Structured Streaming -> your query). Do you have a >>> driver-only cluster or do you have workers too? What's the memory usage >>> profile at workers? >>>
Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)
Hi Chhavi, Currently there is no way to handle backtick(`) spark StructType. Hence the field name a.b and `a.b` are completely different within StructType. To handle that, I have added a custom implementation fixing StringIndexer# validateAndTransformSchema. You can refer to the code on my github <https://github.com/skale1990/LearnSpark/blob/main/src/main/java/com/som/learnspark/TestCustomStringIndexer.scala> . *Regards,* *Someshwar Kale * On Sat, Jun 8, 2024 at 12:00 PM Chhavi Bansal wrote: > Hi Someshwar, > Thanks for the response, I have added my comments to the ticket > <https://issues.apache.org/jira/browse/SPARK-48463>. > > > Thanks, > Chhavi Bansal > > On Thu, 6 Jun 2024 at 17:28, Someshwar Kale wrote: > >> As a fix, you may consider adding a transformer to rename columns >> (perhaps replace all columns with dot to underscore) and use the renamed >> columns in your pipeline as below- >> >> val renameColumn = new >> RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude") >> val si = new >> StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee") >> val pipeline = new Pipeline().setStages(Array(renameColumn, si)) >> pipeline.fit(flattenedDf).transform(flattenedDf).show() >> >> >> refer my comment >> <https://issues.apache.org/jira/browse/SPARK-48463?focusedCommentId=17852751&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17852751> >> for >> elaboration. >> Thanks!! >> >> *Regards,* >> *Someshwar Kale* >> >> >> >> >> >> On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal >> wrote: >> >>> Hello team >>> I was exploring feature transformation exposed via Mllib on nested >>> dataset, and encountered an error while applying any transformer to a >>> column with dot notation naming. I thought of raising a ticket on spark >>> https://issues.apache.org/jira/browse/SPARK-48463, where I have >>> mentioned the entire scenario. >>> >>> I wanted to get suggestions on what would be the best way to solve the >>> problem while using the dot notation. One workaround is to use`_` while >>> flattening the dataframe, but that would mean having an additional overhead >>> to convert back to `.` (dot notation ) since that’s the convention for our >>> other flattened data. >>> >>> I would be happy to make a contribution to the code if someone can shed >>> some light on how this could be solved. >>> >>> >>> >>> -- >>> Thanks and Regards, >>> Chhavi Bansal >>> >> > > -- > Thanks and Regards, > Chhavi Bansal >
Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)
Hi Someshwar, Thanks for the response, I have added my comments to the ticket <https://issues.apache.org/jira/browse/SPARK-48463>. Thanks, Chhavi Bansal On Thu, 6 Jun 2024 at 17:28, Someshwar Kale wrote: > As a fix, you may consider adding a transformer to rename columns (perhaps > replace all columns with dot to underscore) and use the renamed columns in > your pipeline as below- > > val renameColumn = new > RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude") > val si = new > StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee") > val pipeline = new Pipeline().setStages(Array(renameColumn, si)) > pipeline.fit(flattenedDf).transform(flattenedDf).show() > > > refer my comment > <https://issues.apache.org/jira/browse/SPARK-48463?focusedCommentId=17852751&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17852751> > for > elaboration. > Thanks!! > > *Regards,* > *Someshwar Kale* > > > > > > On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal > wrote: > >> Hello team >> I was exploring feature transformation exposed via Mllib on nested >> dataset, and encountered an error while applying any transformer to a >> column with dot notation naming. I thought of raising a ticket on spark >> https://issues.apache.org/jira/browse/SPARK-48463, where I have >> mentioned the entire scenario. >> >> I wanted to get suggestions on what would be the best way to solve the >> problem while using the dot notation. One workaround is to use`_` while >> flattening the dataframe, but that would mean having an additional overhead >> to convert back to `.` (dot notation ) since that’s the convention for our >> other flattened data. >> >> I would be happy to make a contribution to the code if someone can shed >> some light on how this could be solved. >> >> >> >> -- >> Thanks and Regards, >> Chhavi Bansal >> > -- Thanks and Regards, Chhavi Bansal
Re: OOM issue in Spark Driver
Hey, do you perform stateful operations? Maybe your state is growing indefinitely - a screenshot with state metrics would help (you can find it in Spark UI -> Structured Streaming -> your query). Do you have a driver-only cluster or do you have workers too? What's the memory usage profile at workers? Regards, Andrzej sob., 8 cze 2024 o 10:39 Karthick Nk napisał(a): > Hi All, > > I am using the pyspark structure streaming with Azure Databricks for data > load process. > > In the Pipeline I am using a Job cluster and I am running only one > pipeline, I am getting the OUT OF MEMORY issue while running for a > long time. When I inspect the metrics of the cluster I found that, the > memory usage getting increased by time by time even when there is no > huge volume of data. > > [image: image.png] > > > [image: image.png] > > After 4 hours of running the pipeline continuously, I am getting out of > memory issue where used memory in the driver getting increased from 47 GB > to 111 GB which is almost triple, I am unable to understand why this many > memory occcupied in the driver. Am I missing anything here to notice? Could > you guide me to figure out the root cause? > > Note: > 1. I confirmed persist and unpersist that I used in code taken care > properly for every batch execution. > 2. Data is not increasing when time passes, (stream data getting almost > same amount of data for every batch) > > > Thanks, > > > >
OOM issue in Spark Driver
Hi All, I am using the pyspark structure streaming with Azure Databricks for data load process. In the Pipeline I am using a Job cluster and I am running only one pipeline, I am getting the OUT OF MEMORY issue while running for a long time. When I inspect the metrics of the cluster I found that, the memory usage getting increased by time by time even when there is no huge volume of data. [image: image.png] [image: image.png] After 4 hours of running the pipeline continuously, I am getting out of memory issue where used memory in the driver getting increased from 47 GB to 111 GB which is almost triple, I am unable to understand why this many memory occcupied in the driver. Am I missing anything here to notice? Could you guide me to figure out the root cause? Note: 1. I confirmed persist and unpersist that I used in code taken care properly for every batch execution. 2. Data is not increasing when time passes, (stream data getting almost same amount of data for every batch) Thanks,
Re: 7368396 - Apache Spark 3.5.1 (Support)
Hi Alex, Spark is an open source software available under Apache License 2.0 ( https://www.apache.org/licenses/), further details can be found here in the FAQ page (https://spark.apache.org/faq.html). Hope this helps. Thanks, Sadha On Thu, Jun 6, 2024, 1:32 PM SANTOS SOUZA, ALEX wrote: > Hey guys! > > > > I am part of the team responsible for software approval at EMBRAER S.A. > We are currently in the process of approving the Apache Spark 3.5.1 > software and are verifying the licensing of the application. > Therefore, I would like to kindly request you to answer the questions > below. > > -What type of software? (Commercial, Freeware, Component, etc...) > A: > > -What is the licensing model for commercial use? (Subscription, Perpetual, > GPL, etc...) > A: > > -What type of license? (By user, Competitor, Device, Server or others)? > A: > > -Number of installations allowed per license/subscription? > A: > > Can it be used in the defense and aerospace industry? (Company that > manufactures products for national defense) > A: > > -Does the license allow use in any location regardless of the origin of > the purchase (tax restriction)? > A: > > -Where can I find the End User License Agreement (EULA) for the version in > question? > A: > > > > Desde já, muito obrigado e qualquer dúvida estou à disposição. / Thank you > very much in advance and I am at your disposal if you have any questions. > > > Att, > > > Alex Santos Souza > > Software Asset Management - Embraer > > WhatsApp: +55 12 99731-7579 > > E-mail: alex.santosso...@dxc.com > > DXC Technology > > São José dos Campos, SP - Brazil > >
7368396 - Apache Spark 3.5.1 (Support)
Hey guys! I am part of the team responsible for software approval at EMBRAER S.A. We are currently in the process of approving the Apache Spark 3.5.1 software and are verifying the licensing of the application. Therefore, I would like to kindly request you to answer the questions below. -What type of software? (Commercial, Freeware, Component, etc...) A: -What is the licensing model for commercial use? (Subscription, Perpetual, GPL, etc...) A: -What type of license? (By user, Competitor, Device, Server or others)? A: -Number of installations allowed per license/subscription? A: Can it be used in the defense and aerospace industry? (Company that manufactures products for national defense) A: -Does the license allow use in any location regardless of the origin of the purchase (tax restriction)? A: -Where can I find the End User License Agreement (EULA) for the version in question? A: Desde já, muito obrigado e qualquer dúvida estou à disposição. / Thank you very much in advance and I am at your disposal if you have any questions. Att, [cid:babbaea5-d892-4b6e-abd9-d0da0cc3e296] Alex Santos Souza Software Asset Management - Embraer WhatsApp: +55 12 99731-7579 E-mail: alex.santosso...@dxc.com DXC Technology São José dos Campos, SP - Brazil
Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)
As a fix, you may consider adding a transformer to rename columns (perhaps replace all columns with dot to underscore) and use the renamed columns in your pipeline as below- val renameColumn = new RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude") val si = new StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee") val pipeline = new Pipeline().setStages(Array(renameColumn, si)) pipeline.fit(flattenedDf).transform(flattenedDf).show() refer my comment <https://issues.apache.org/jira/browse/SPARK-48463?focusedCommentId=17852751&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17852751> for elaboration. Thanks!! *Regards,* *Someshwar Kale* On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal wrote: > Hello team > I was exploring feature transformation exposed via Mllib on nested > dataset, and encountered an error while applying any transformer to a > column with dot notation naming. I thought of raising a ticket on spark > https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned > the entire scenario. > > I wanted to get suggestions on what would be the best way to solve the > problem while using the dot notation. One workaround is to use`_` while > flattening the dataframe, but that would mean having an additional overhead > to convert back to `.` (dot notation ) since that’s the convention for our > other flattened data. > > I would be happy to make a contribution to the code if someone can shed > some light on how this could be solved. > > > > -- > Thanks and Regards, > Chhavi Bansal >
[SPARK-48423] Unable to save ML Pipeline to azure blob storage
Hello team, I was exploring on how to save ML pipeline to azure blob storage, but was setback by an issue where it complains of `fs.azure.account.key` not being found in the configuration even when I have provided the values in the pipelineModel.option(key1,value1) field. I considered raising a ticket on spark https://issues.apache.org/jira/browse/SPARK-48423, where I describe the entire scenario. I tried debugging the code and found that this key is being explicitly asked for in the code. The only solution was to again set it part of spark.conf which could result to a race condition since we work on multi-tenant architecture. Since saving to Azure blob storage would be common, Can someone please guide me if I am missing something in the `.option` clause? I would be happy to make a contribution to the code if someone can shed some light on how this could be solved. -- Thanks and Regards, Chhavi Bansal
[SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)
Hello team I was exploring feature transformation exposed via Mllib on nested dataset, and encountered an error while applying any transformer to a column with dot notation naming. I thought of raising a ticket on spark https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned the entire scenario. I wanted to get suggestions on what would be the best way to solve the problem while using the dot notation. One workaround is to use`_` while flattening the dataframe, but that would mean having an additional overhead to convert back to `.` (dot notation ) since that’s the convention for our other flattened data. I would be happy to make a contribution to the code if someone can shed some light on how this could be solved. -- Thanks and Regards, Chhavi Bansal
Inquiry Regarding Security Compliance of Apache Spark Docker Image
Dear Apache Team, I hope this email finds you well. We are a team from Ernst and Young LLP - India, dedicated to providing innovative supply chain solutions for a diverse range of clients. Our team recently encountered a pivotal use case necessitating the utilization of PySpark for a project aimed at handling substantial volumes of data. As part of our deployment strategy, we are endeavouring to implement a Spark-based application on our Azure Kubernetes service. Regrettably, we have encountered challenges from a security perspective with the latest Apache Spark Docker image, specifically apache/spark-py:latest. Our security team has meticulously conducted an assessment and has generated a comprehensive vulnerability report highlighting areas of concern. Given the non-compliance of the Docker image with our organization's stringent security protocols, we find ourselves unable to proceed with its integration into our applications. We attach the vulnerability report herewith for your perusal. Considering these circumstances, we kindly request your esteemed team to provide any resolutions or guidance that may assist us in mitigating the identified security vulnerabilities. Your prompt attention to this matter would be greatly appreciated, as it is crucial for the successful deployment and operation of our Spark-based application within our infrastructure. Thank you for your attention to this inquiry, and we look forward to your valued support and assistance. Please find attachment for the vulnerability report Best Regards, Tonmoy Sagar | Sr. Consultant | Advisory | Asterisk Ernst & Young LLP C-401, Panchshil Tech Park One, Yerawada, Pune, Maharashtra 411006, India Mobile: +91 8724918230 | tonmoy.sa...@in.ey.com<mailto:tonmoy.sa...@in.ey.com> Thrive in the Transformative Age with the better-connected consultants - ey.com/consulting<http://ey.com/consulting> The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you have received this communication in error, please notify us immediately by responding to this email and then delete it from your system. The firm is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt. spark_vulnerability_report.xlsx Description: spark_vulnerability_report.xlsx - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[ANNOUNCE] Announcing Apache Spark 4.0.0-preview1
Hi all, To enable wide-scale community testing of the upcoming Spark 4.0 release, the Apache Spark community has posted a preview release of Spark 4.0. This preview is not a stable release in terms of either API or functionality, but it is meant to give the community early access to try the code that will become Spark 4.0. If you would like to test the release, please download it, and send feedback using either the mailing lists or JIRA. There are a lot of exciting new features added to Spark 4.0, including ANSI mode by default, Python data source, polymorphic Python UDTF, string collation support, new VARIANT data type, streaming state store data source, structured logging, Java 17 by default, and many more. We'd like to thank our contributors and users for their contributions and early feedback to this release. This release would not have been possible without you. To download Spark 4.0.0-preview1, head over to the download page: https://archive.apache.org/dist/spark/spark-4.0.0-preview1 . It's also available in PyPI, with version name "4.0.0.dev1". Thanks, Wenchen
[apache-spark][spark-dataframe] DataFrameWriter.partitionBy does not guarantee previous sort result
I have a dataset that have the following schema: (timestamp, partitionKey, logValue) I want to have the dataset to be sorted by timestamp, but write to file in the follow directory layout: outputDir/partitionKey/files The output file only contains logValue, that is, timestamp is used for sorting only and is not used for output. (FYI, logValue contains textual representation of timestamp which is not sortable) My first attempt is to use DataFrameWriter.partitionBy: dataset .sort("timestamp") .select("partitionKey", "logValue") .write() .partitionBy("partitionKey") .text("output"); However, as mentioned in SPARK-44512 ( https://issues.apache.org/jira/browse/SPARK-44512), this does not guarantee the output is globally sorted. (note: I found that even setting spark.sql.optimizer.plannedWrite.enabled=false still does not guarantee sorted result in low memory environment) And the developers say DataFrameWriter.partitionBy does not guarantee sorted results: "Although I understand Apache Spark 3.4.0 changes the behavior like the above, I don't think there is a contract that Apache Spark's `partitionBy` operation preserves the previous ordering." To workaround this problem, I have to resort to creating a hadoop output format by extending org.apache.hadoop.mapred.lib.MultipleTextOutputFormat and output the file by saveAsHadoopFile: import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; public final class PartitionedMultipleTextOutputFormat extends MultipleTextOutputFormat { @SuppressWarnings("MissingJavadocMethod") public PartitionedMultipleTextOutputFormat() { super(); } @Override protected Object generateActualKey(final Object key, final V value) { return NullWritable.get(); } @Override protected String generateFileNameForKeyValue(final Object key, final V value, final String leaf) { return new Path(key.toString(), leaf).toString(); } } private static Tuple2 mapRDDToDomainLogPair(final Row row) { final String domain = row.getAs(" partitionKey "); final var log = (String) row.getAs("logValue"); final var logTextClass = new Text(log); return new Tuple2(domain, logTextClass); } dataset .sort("timestamp") .javaRDD() .mapToPair(TheClass::mapRDDToDomainLogPair) .saveAsHadoopFile(hdfsTmpPath, String.class, Text.class, PartitionedMultipleTextOutputFormat.class, GzipCodec.class); Which seems a little bit hacky. Does anyone have another better method?
Re: [s3a] Spark is not reading s3 object content
I am reading from a single file: df = spark.read.text("s3a://test-bucket/testfile.csv") On Fri, May 31, 2024 at 5:26 AM Mich Talebzadeh wrote: > Tell Spark to read from a single file > > data = spark.read.text("s3a://test-bucket/testfile.csv") > > This clarifies to Spark that you are dealing with a single file and avoids > any bucket-like interpretation. > > HTH > > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI | FinCrime > PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College > London <https://en.wikipedia.org/wiki/Imperial_College_London> > London, United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh > wrote: > >> I will work on the first two possible causes. >> For the third one, which I guess is the real problem, Spark treats the >> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket >> to access _spark_metadata with url >> s3a://test-bucket/testfile.csv/_spark_metadata >> testfile.csv is an object and should not be treated as a bucket. But I am >> not sure how to prevent Spark from doing that. >> >
Re: [s3a] Spark is not reading s3 object content
Tell Spark to read from a single file data = spark.read.text("s3a://test-bucket/testfile.csv") This clarifies to Spark that you are dealing with a single file and avoids any bucket-like interpretation. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh wrote: > I will work on the first two possible causes. > For the third one, which I guess is the real problem, Spark treats the > testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket > to access _spark_metadata with url > s3a://test-bucket/testfile.csv/_spark_metadata > testfile.csv is an object and should not be treated as a bucket. But I am > not sure how to prevent Spark from doing that. >
Re: [s3a] Spark is not reading s3 object content
I will work on the first two possible causes. For the third one, which I guess is the real problem, Spark treats the testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket to access _spark_metadata with url s3a://test-bucket/testfile.csv/_spark_metadata testfile.csv is an object and should not be treated as a bucket. But I am not sure how to prevent Spark from doing that.
Re: [s3a] Spark is not reading s3 object content
ok some observations - Spark job successfully lists the S3 bucket containing testfile.csv. - Spark job can retrieve the file size (33 Bytes) for testfile.csv. - Spark job fails to read the actual data from testfile.csv. - The printed content from testfile.csv is an empty list. - Spark logs show a debug message with an exception related to UserGroupInformation while trying to access the _spark_metadata file associated with testfile.csv. possible causes - Permission Issues: Spark user (likely ubuntu based on logs) might lack the necessary permissions to access the testfile.csv file or the _spark_metadata file on S3 storage. - Spark Configuration: Issues with Spark's configuration for S3 access, such as missing credentials or incorrect security settings. - Spark attempting to read unnecessary files: The _spark_metadata file might not be essential for your current operation, and Spark's attempt to read it could be causing the issue. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh wrote: > The code should read testfile.csv file from s3. and print the content. It > only prints a empty list although the file has content. > I have also checked our custom s3 storage (Ceph based) logs and I see only > LIST operations coming from Spark, there is no GET object operation for > testfile.csv > > The only error I see in DEBUG output is these lines: > > = > 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log > from s3a://test-bucket/testfile.csv/_spark_metadata > 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu > (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238] > java.lang.Exception > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at > org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) > at > org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465) > at > org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48) > at > org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91) > at > org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369) > at > org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) > at > org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) > at scala.Option.getOrElse(Option.scala:201) > at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) > at > org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > at py4j.Gateway.invoke(Gateway.java:282) > at > py4j.commands.AbstractCommand.invokeMethod(Abs
Re: [s3a] Spark is not reading s3 object content
The code should read testfile.csv file from s3. and print the content. It only prints a empty list although the file has content. I have also checked our custom s3 storage (Ceph based) logs and I see only LIST operations coming from Spark, there is no GET object operation for testfile.csv The only error I see in DEBUG output is these lines: = 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log from s3a://test-bucket/testfile.csv/_spark_metadata 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238] java.lang.Exception at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465) at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48) at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91) at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) at scala.Option.getOrElse(Option.scala:201) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) === Which I am not sure if it is related since Spark can see and list the bucket (it also returns the correct object size which is 33 Bytes.). Best, Amin On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh wrote: > Hello, > > Overall, the exit code of 0 suggests a successful run of your Spark job. > Analyze the intended purpose of your code and verify the output or Spark UI > for further confirmation. > > 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with > exitCode 0. > > what to check > > >1. Verify Output: If your Spark job was intended to read data from S3 >and process it, you will need to verify the output to ensure the data was >handled correctly. This might involve checking if any results were written >to a designated location or if any transformations were applied > successfully. >2. Review Code: >3. Check Spark UI: > > > HTH > > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI | FinCrime > PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College > London <https://en.wikipedia.org/wiki/Imperial_College_London> > London, United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernh
Re: [s3a] Spark is not reading s3 object content
Hello, Overall, the exit code of 0 suggests a successful run of your Spark job. Analyze the intended purpose of your code and verify the output or Spark UI for further confirmation. 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode 0. what to check 1. Verify Output: If your Spark job was intended to read data from S3 and process it, you will need to verify the output to ensure the data was handled correctly. This might involve checking if any results were written to a designated location or if any transformations were applied successfully. 2. Review Code: 3. Check Spark UI: HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh wrote: > Hi Mich, > > Thank you for the help and sorry about the late reply. > I ran your provided but I got "exitCode 0". Here is the complete output: > > ======= > > > 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0 > 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, > amd64 > 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22 > 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 24/05/30 01:23:38 INFO ResourceUtils: > == > 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for > spark.driver. > 24/05/30 01:23:38 INFO ResourceUtils: > == > 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest > 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created, > executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: > , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> > name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> > name: cpus, amount: 1.0) > 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu > 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0 > 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu > 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu > 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to: > 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to: > 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: ubuntu; groups > with view permissions: EMPTY; users with modify permissions: ubuntu; groups > with modify permissions: EMPTY > 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' > on port 46321. > 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker > 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster > 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using > org.apache.spark.storage.DefaultTopologyMapper for getting topology > information > 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: > BlockManagerMasterEndpoint up > 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat > 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at > /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee > 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8 > GiB > 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator > 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI > 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on > port 4040. > 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host > MOC-R4PAC08U33-S1C > 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64 > 24/05/30 01:23:39 INFO Executor: Java version 11.0.22 > 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath > (userClassPathFirst = false): '' > 24/05/30 01:23:39 INFO Executor: Created or updated repl
Re: [s3a] Spark is not reading s3 object content
Hi Mich, Thank you for the help and sorry about the late reply. I ran your provided but I got "exitCode 0". Here is the complete output: === 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, amd64 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/05/30 01:23:38 INFO ResourceUtils: == 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for spark.driver. 24/05/30 01:23:38 INFO ResourceUtils: == 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to: 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to: 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: ubuntu; groups with view permissions: EMPTY; users with modify permissions: ubuntu; groups with modify permissions: EMPTY 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' on port 46321. 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8 GiB 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on port 4040. 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host MOC-R4PAC08U33-S1C 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64 24/05/30 01:23:39 INFO Executor: Java version 11.0.22 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): '' 24/05/30 01:23:39 INFO Executor: Created or updated repl class loader org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default. 24/05/30 01:23:39 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343. 24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on MOC-R4PAC08U33-S1C:39343 24/05/30 01:23:39 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) 24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) 24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) 24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None) 24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir. 24/05/30 01:23:39 INFO SharedState: Warehouse path is 'file:/home/ubuntu/tpch-spark/spark-warehouse'. 24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system started 24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log from s3a://test-bucket/testfile.csv/_spark_metadata 24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing: 24/05/30 01:23:43 INFO FileSo
[Spark on k8s] A issue of k8s resource creation order
Hi, team! I have a spark on k8s issue which posts in https://stackoverflow.com/questions/78537132/spark-on-k8s-resource-creation-order Need help
Re: Spark Protobuf Deserialization
Did you try using to_protobuf and from_protobuf ? https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html On Mon, May 27, 2024 at 15:45 Satyam Raj wrote: > Hello guys, > We're using Spark 3.5.0 for processing Kafka source that contains protobuf > serialized data. The format is as follows: > > message Request { > long sent_ts = 1; > Event[] event = 2; > } > > message Event { > string event_name = 1; > bytes event_bytes = 2; > } > > The event_bytes contains the data for the event_name. event_name is the > className of the Protobuf class. > Currently, we parse the protobuf message from the Kafka topic, and for > every event in the array, push the event_bytes to the `event_name` topic, > over which spark jobs run and use the same event_name protobuf class to > deserialize the data. > > Is there a better way to do all this in a single job? >
Re: [Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?
When you use applyInPandasWithState, Spark processes each input row as it arrives, regardless of whether certain columns, such as the timestamp column, contain NULL values. This behavior is useful where you want to handle incomplete or missing data gracefully within your stateful processing logic. By allowing NULL timestamps to trigger calls to the stateful function, you can implement custom handling strategies, such as skipping incomplete records, within your stateful function. However, it is important to understand that this behavior also *means that the watermark is not advanced for NULL timestamps*. The watermark is used for event-time processing in Spark Structured Streaming, to track the progress of event-time in your data stream and is typically based on the timestamp column. Since NULL timestamps do not contribute to the watermark advancement, Regarding whether you can rely on this behavior for your production code, it largely depends on your requirements and use case. If your application logic is designed to handle NULL timestamps appropriately and you have tested it to ensure it behaves as expected, then you can generally rely on this behavior. FYI, I have not tested it myself, so I cannot provide a definitive answer. Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 27 May 2024 at 22:04, Juan Casse wrote: > I am using applyInPandasWithState in PySpark 3.5.0. > > I noticed that records with timestamp==NULL are processed (i.e., trigger a > call to the stateful function). And, as you would expect, does not advance > the watermark. > > I am taking advantage of this in my application. > > My question: Is this a supported feature of Spark? Can I rely on this > behavior for my production code? > > Thanks, > Juan >
Spark Protobuf Deserialization
Hello guys, We're using Spark 3.5.0 for processing Kafka source that contains protobuf serialized data. The format is as follows: message Request { long sent_ts = 1; Event[] event = 2; } message Event { string event_name = 1; bytes event_bytes = 2; } The event_bytes contains the data for the event_name. event_name is the className of the Protobuf class. Currently, we parse the protobuf message from the Kafka topic, and for every event in the array, push the event_bytes to the `event_name` topic, over which spark jobs run and use the same event_name protobuf class to deserialize the data. Is there a better way to do all this in a single job?
[Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?
I am using applyInPandasWithState in PySpark 3.5.0. I noticed that records with timestamp==NULL are processed (i.e., trigger a call to the stateful function). And, as you would expect, does not advance the watermark. I am taking advantage of this in my application. My question: Is this a supported feature of Spark? Can I rely on this behavior for my production code? Thanks, Juan
Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing
Seen this before; had a very(!) complex plan behind a DataFrame, to the point where any additional transformation went OOM on the driver. A quick and ugly solution was to break the plan - convert the DataFrame to rdd and back to DF at certain points to make the plan shorter. This has obvious drawbacks, and is not recommended in general, but at least we had something working. The real, long-term solution was to replace the many ( > 200) withColumn() calls to only a few select() calls. You can easily find sources on the internet for why this is better. (it was on Spark 2.3, but I think the main principles remain). TBH, it was easier than I expected, as it mainly involved moving pieces of code from one place to another, and not a "real", meaningful refactoring. From: Mich Talebzadeh Sent: Monday, May 27, 2024 15:43 Cc: user@spark.apache.org Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing This message contains hyperlinks, take precaution before opening these links. Few ideas on top of my head for how to go about solving the problem 1. Try with subsets: Try reproducing the issue with smaller subsets of your data to pinpoint the specific operation causing the memory problems. 2. Explode or Flatten Nested Structures: If your DataFrame schema involves deep nesting, consider using techniques like explode or flattening to transform it into a less nested structure. This can reduce memory usage during operations like withColumn. 3. Lazy Evaluation: Use select before withColumn: this ensures lazy evaluation, meaning Spark only materializes the data when necessary. This can improve memory usage compared to directly calling withColumn on the entire DataFrame. 4. spark.sql.shuffle.partitions: Setting this configuration to a value close to the number of executors can improve shuffle performance and potentially reduce memory usage. 5. Spark UI Monitoring: Utilize the Spark UI to monitor memory usage throughout your job execution and identify potential memory bottlenecks. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun> Von Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 27 May 2024 at 12:50, Gaurav Madan wrote: Dear Community, I'm reaching out to seek your assistance with a memory issue we've been facing while processing certain large and nested DataFrames using Apache Spark. We have encountered a scenario where the driver runs out of memory when applying the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the same DataFrames are processed successfully in Spark 2.4.0. Problem Summary: For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes the driver to choke and run out of memory. However, the same DataFrames are processed successfully in Spark 2.4.0. Heap Dump Analysis: We performed a heap dump analysis after enabling heap dump on out-of-memory errors, and the analysis revealed the following significant frames and local variables: ``` org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:4273) org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:1622) org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:2820) org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:2759) org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset; (DataPersistenceUtil.scala:88) org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes com.urbanclap.dp.eventpersistence.
Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing
Few ideas on top of my head for how to go about solving the problem 1. Try with subsets: Try reproducing the issue with smaller subsets of your data to pinpoint the specific operation causing the memory problems. 2. Explode or Flatten Nested Structures: If your DataFrame schema involves deep nesting, consider using techniques like explode or flattening to transform it into a less nested structure. This can reduce memory usage during operations like withColumn. 3. Lazy Evaluation: Use select before withColumn: this ensures lazy evaluation, meaning Spark only materializes the data when necessary. This can improve memory usage compared to directly calling withColumn on the entire DataFrame. 4. spark.sql.shuffle.partitions: Setting this configuration to a value close to the number of executors can improve shuffle performance and potentially reduce memory usage. 5. Spark UI Monitoring: Utilize the Spark UI to monitor memory usage throughout your job execution and identify potential memory bottlenecks. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 27 May 2024 at 12:50, Gaurav Madan wrote: > Dear Community, > > I'm reaching out to seek your assistance with a memory issue we've been > facing while processing certain large and nested DataFrames using Apache > Spark. We have encountered a scenario where the driver runs out of memory > when applying the `withColumn` method on specific DataFrames in Spark > 3.4.1. However, the same DataFrames are processed successfully in Spark > 2.4.0. > > > *Problem Summary:*For certain DataFrames, applying the `withColumn` > method in Spark 3.4.1 causes the driver to choke and run out of memory. > However, the same DataFrames are processed successfully in Spark 2.4.0. > > > *Heap Dump Analysis:*We performed a heap dump analysis after enabling > heap dump on out-of-memory errors, and the analysis revealed the following > significant frames and local variables: > > ``` > > org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset; > (Dataset.scala:4273) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset; > (Dataset.scala:1622) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset; > (Dataset.scala:2820) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset; > (Dataset.scala:2759) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset; > (DataPersistenceUtil.scala:88) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V > (DataPersistenceUtil.scala:19) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V > (BronzeStep.scala:23) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) > bytes > > com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V > (MainJob.scala:78) > > org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,8