Re: is it possible to run spark2 on EMR 7.2.0

2024-09-20 Thread Prem Sahoo
It is not possible as emr 7.2.0 comes with Hadoop 3.x and Spark3.x by default . 
If you are looking for migration Spark 2 to 3 then use emr 6.x probably 6.2 .
Sent from my iPhone

> On Sep 20, 2024, at 9:18 AM, joachim rodrigues  
> wrote:
> 
> 
> I'd like to start a migration from spark2 to spark3 is it possible to migrate 
> my EMR cluster to emr-7.2.0 and migrate progressively. Meaning that i can run 
> spark2 on emr-7.2.3 which is dedicate to spark 3 ?


Re: ERROR: GROUP BY position 0 is not in select list , when using catalyst parser

2024-09-17 Thread joshita mishra
Unsubscribe

On Wed, Sep 18, 2024, 09:39 Sudhanshu  wrote:

> unsubscribe
>
> On Wed, Sep 11, 2024 at 1:51 PM Rommel Holmes 
> wrote:
>
>> i am using spark 3.3.1
>> here is the sql_string to query a ds partitioned table
>>
>> ```
>> SELECT
>> '2024-09-09' AS ds,
>>  AVG(v1) AS avg_v1,
>>  AVG(v2) AS avg_v2,
>>  AVG(v3) AS avg_v3
>> FROM schema.t1
>>  WHERE ds = '2024-09-09'
>>  GROUP BY 1
>> ```
>>
>> if i am passing the sql_string directly into spark.sql(sql_string), it
>> can execute without issue.
>>
>> if i pass the string into catalyst parser, here is the logical_plan in
>> string representation.
>> ```
>> Aggregate [1], [2024-09-09 AS ds#164, 'AVG('v1) AS avg_v1#165, 'AVG('v2)
>> AS avg_v2#166, 'AVG('v3) AS avg_v3#167]
>> +- 'Filter ('ds = 2024-09-09)
>>+- 'UnresolvedRelation [schema, t1], [], false
>> ```
>>
>> and i want to execute the logical plan
>> ```
>> val tracker = new QueryPlanningTracker()
>> // Analyze the logical plan
>> val analyzedPlan =
>> sparkSession.sessionState.analyzer.executeAndTrack(logical_plan, tracker)
>> // Optimize the analyzed plan
>> optimizedPlan =
>> sparkSession.sessionState.optimizer.executeAndTrack(analyzedPlan, tracker)
>> ```
>>
>> it will throw error msg as
>> >[GROUP_BY_POS_OUT_OF_RANGE] GROUP BY position 0 is not in select list
>> (valid range is [1, 4])
>>
>>
>> --
>>  Yours
>>  Rommel
>>
>>


Re: ERROR: GROUP BY position 0 is not in select list , when using catalyst parser

2024-09-17 Thread Sudhanshu
unsubscribe

On Wed, Sep 11, 2024 at 1:51 PM Rommel Holmes 
wrote:

> i am using spark 3.3.1
> here is the sql_string to query a ds partitioned table
>
> ```
> SELECT
> '2024-09-09' AS ds,
>  AVG(v1) AS avg_v1,
>  AVG(v2) AS avg_v2,
>  AVG(v3) AS avg_v3
> FROM schema.t1
>  WHERE ds = '2024-09-09'
>  GROUP BY 1
> ```
>
> if i am passing the sql_string directly into spark.sql(sql_string), it can
> execute without issue.
>
> if i pass the string into catalyst parser, here is the logical_plan in
> string representation.
> ```
> Aggregate [1], [2024-09-09 AS ds#164, 'AVG('v1) AS avg_v1#165, 'AVG('v2)
> AS avg_v2#166, 'AVG('v3) AS avg_v3#167]
> +- 'Filter ('ds = 2024-09-09)
>+- 'UnresolvedRelation [schema, t1], [], false
> ```
>
> and i want to execute the logical plan
> ```
> val tracker = new QueryPlanningTracker()
> // Analyze the logical plan
> val analyzedPlan =
> sparkSession.sessionState.analyzer.executeAndTrack(logical_plan, tracker)
> // Optimize the analyzed plan
> optimizedPlan =
> sparkSession.sessionState.optimizer.executeAndTrack(analyzedPlan, tracker)
> ```
>
> it will throw error msg as
> >[GROUP_BY_POS_OUT_OF_RANGE] GROUP BY position 0 is not in select list
> (valid range is [1, 4])
>
>
> --
>  Yours
>  Rommel
>
>


Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-09 Thread Nagatomi Yasukazu
I apologize if my previous explanation was unclear, and I realize I didn’t
provide enough context for my question.

The reason I want to submit a Spark application to a Kubernetes cluster
using the Spark Operator is that I want to use Kubernetes as the Cluster
Manager, rather than Standalone mode. This would allow the Spark Connect
Driver and Executors to run on different nodes within the Kubernetes
cluster.

I understand that it is currently possible to launch Spark Connect by
setting the Cluster Manager to Standalone. However, in that case, the
Driver and Executors run on the same node, which I believe would not scale
efficiently.

Therefore, I am considering specifying Kubernetes (specifically the
Kubernetes API) as the Cluster Manager to dynamically distribute and
schedule the Driver and multiple Executor Pods across all nodes in the
Kubernetes cluster.

With the Spark Operator, it is easy to specify Kubernetes as the Cluster
Manager, but the Spark Operator does not allow the use of the "client"
deploy mode (i.e., the deploy mode will be "cluster"). On the other hand,
Spark Connect does not support the "cluster" deploy mode, leading to a
deadlock between the two specifications.

That is why I wanted to understand the reason why Spark Connect does not
allow the "cluster" deploy mode, and this was the main point of my original
question.

2024年9月10日(火) 0:29 Prabodh Agarwal :

> Oh. This issue is pretty straightforward to solve actually. Particularly,
> in spark-3.5.2.
>
> Just download the `spark-connect` maven jar and place it in
> `$SPARK_HOME/jars`. Then rebuild the docker image. I saw that I had posted
> a comment on this Jira as well. I could fix this up for standalone cluster
> at least this way.
>
> On Mon, Sep 9, 2024 at 7:04 PM Nagatomi Yasukazu 
> wrote:
>
>> Hi Prabodh,
>>
>> Thank you for your response.
>>
>> As you can see from the following JIRA issue, it is possible to run the
>> Spark Connect Driver on Kubernetes:
>>
>> https://issues.apache.org/jira/browse/SPARK-45769
>>
>> However, this issue describes a problem that occurs when the Driver and
>> Executors are running on different nodes. This could potentially be the
>> reason why only Standalone mode is currently supported, but I am not
>> certain about it.
>>
>> Thank you for your attention.
>>
>>
>> 2024年9月9日(月) 12:40 Prabodh Agarwal :
>>
>>> My 2 cents regarding my experience with using spark connect in cluster
>>> mode.
>>>
>>> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master &
>>> other nodes as workers. Deploy spark connect pointing to the master node.
>>> This works well. The approach is not well documented, but I could figure
>>> it out by hit-and-trial.
>>> 2. In k8s, by default; we can actually get the executors to run on
>>> kubernetes itself. That is pretty straightforward, but the driver continues
>>> to run on a local machine. But yeah, I agree as well, making the driver to
>>> run on k8s itself would be slick.
>>>
>>> Thank you.
>>>
>>>
>>> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu 
>>> wrote:
>>>
 Hi All,

 Why is it not possible to specify cluster as the deploy mode for Spark
 Connect?

 As discussed in the following thread, it appears that there is an
 "arbitrary decision" within spark-submit that "Cluster mode is not
 applicable" to Spark Connect.

 GitHub Issue Comment:

 https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607

 > This will circumvent the submission error you may have gotten if you
 tried to just run the SparkConnectServer directly. From my investigation,
 that looks to be an arbitrary decision within spark-submit that Cluster
 mode is "not applicable" to SparkConnect. Which is sort of true except when
 using this operator :)

 I have reviewed the following commit and pull request, but I could not
 find any discussion or reason explaining why cluster mode is not available:

 Related Commit:

 https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4

 Related Pull Request:
 https://github.com/apache/spark/pull/39928

 This restriction poses a significant obstacle when trying to use Spark
 Connect with the Spark Operator. If there is a technical reason for this, I
 would like to know more about it. Additionally, if this issue is being
 tracked on JIRA or elsewhere, I would appreciate it if you could provide a
 link.

 Thank you in advance.

 Best regards,
 Yasukazu Nagatomi

>>>


Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-09 Thread Prabodh Agarwal
Oh. This issue is pretty straightforward to solve actually. Particularly,
in spark-3.5.2.

Just download the `spark-connect` maven jar and place it in
`$SPARK_HOME/jars`. Then rebuild the docker image. I saw that I had posted
a comment on this Jira as well. I could fix this up for standalone cluster
at least this way.

On Mon, Sep 9, 2024 at 7:04 PM Nagatomi Yasukazu 
wrote:

> Hi Prabodh,
>
> Thank you for your response.
>
> As you can see from the following JIRA issue, it is possible to run the
> Spark Connect Driver on Kubernetes:
>
> https://issues.apache.org/jira/browse/SPARK-45769
>
> However, this issue describes a problem that occurs when the Driver and
> Executors are running on different nodes. This could potentially be the
> reason why only Standalone mode is currently supported, but I am not
> certain about it.
>
> Thank you for your attention.
>
>
> 2024年9月9日(月) 12:40 Prabodh Agarwal :
>
>> My 2 cents regarding my experience with using spark connect in cluster
>> mode.
>>
>> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master &
>> other nodes as workers. Deploy spark connect pointing to the master node.
>> This works well. The approach is not well documented, but I could figure
>> it out by hit-and-trial.
>> 2. In k8s, by default; we can actually get the executors to run on
>> kubernetes itself. That is pretty straightforward, but the driver continues
>> to run on a local machine. But yeah, I agree as well, making the driver to
>> run on k8s itself would be slick.
>>
>> Thank you.
>>
>>
>> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu 
>> wrote:
>>
>>> Hi All,
>>>
>>> Why is it not possible to specify cluster as the deploy mode for Spark
>>> Connect?
>>>
>>> As discussed in the following thread, it appears that there is an
>>> "arbitrary decision" within spark-submit that "Cluster mode is not
>>> applicable" to Spark Connect.
>>>
>>> GitHub Issue Comment:
>>>
>>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607
>>>
>>> > This will circumvent the submission error you may have gotten if you
>>> tried to just run the SparkConnectServer directly. From my investigation,
>>> that looks to be an arbitrary decision within spark-submit that Cluster
>>> mode is "not applicable" to SparkConnect. Which is sort of true except when
>>> using this operator :)
>>>
>>> I have reviewed the following commit and pull request, but I could not
>>> find any discussion or reason explaining why cluster mode is not available:
>>>
>>> Related Commit:
>>>
>>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4
>>>
>>> Related Pull Request:
>>> https://github.com/apache/spark/pull/39928
>>>
>>> This restriction poses a significant obstacle when trying to use Spark
>>> Connect with the Spark Operator. If there is a technical reason for this, I
>>> would like to know more about it. Additionally, if this issue is being
>>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a
>>> link.
>>>
>>> Thank you in advance.
>>>
>>> Best regards,
>>> Yasukazu Nagatomi
>>>
>>


Re: [CONNECT] Why Can't We Specify Cluster Deploy Mode for Spark Connect?

2024-09-09 Thread Nagatomi Yasukazu
Hi Prabodh,

Thank you for your response.

As you can see from the following JIRA issue, it is possible to run the
Spark Connect Driver on Kubernetes:

https://issues.apache.org/jira/browse/SPARK-45769

However, this issue describes a problem that occurs when the Driver and
Executors are running on different nodes. This could potentially be the
reason why only Standalone mode is currently supported, but I am not
certain about it.

Thank you for your attention.


2024年9月9日(月) 12:40 Prabodh Agarwal :

> My 2 cents regarding my experience with using spark connect in cluster
> mode.
>
> 1. Create a spark cluster of 2 or more nodes. Make 1 node as master &
> other nodes as workers. Deploy spark connect pointing to the master node.
> This works well. The approach is not well documented, but I could figure
> it out by hit-and-trial.
> 2. In k8s, by default; we can actually get the executors to run on
> kubernetes itself. That is pretty straightforward, but the driver continues
> to run on a local machine. But yeah, I agree as well, making the driver to
> run on k8s itself would be slick.
>
> Thank you.
>
>
> On Mon, Sep 9, 2024 at 6:17 AM Nagatomi Yasukazu 
> wrote:
>
>> Hi All,
>>
>> Why is it not possible to specify cluster as the deploy mode for Spark
>> Connect?
>>
>> As discussed in the following thread, it appears that there is an
>> "arbitrary decision" within spark-submit that "Cluster mode is not
>> applicable" to Spark Connect.
>>
>> GitHub Issue Comment:
>>
>> https://github.com/kubeflow/spark-operator/issues/1801#issuecomment-2000494607
>>
>> > This will circumvent the submission error you may have gotten if you
>> tried to just run the SparkConnectServer directly. From my investigation,
>> that looks to be an arbitrary decision within spark-submit that Cluster
>> mode is "not applicable" to SparkConnect. Which is sort of true except when
>> using this operator :)
>>
>> I have reviewed the following commit and pull request, but I could not
>> find any discussion or reason explaining why cluster mode is not available:
>>
>> Related Commit:
>>
>> https://github.com/apache/spark/commit/11260310f65e1a30f6b00b380350e414609c5fd4
>>
>> Related Pull Request:
>> https://github.com/apache/spark/pull/39928
>>
>> This restriction poses a significant obstacle when trying to use Spark
>> Connect with the Spark Operator. If there is a technical reason for this, I
>> would like to know more about it. Additionally, if this issue is being
>> tracked on JIRA or elsewhere, I would appreciate it if you could provide a
>> link.
>>
>> Thank you in advance.
>>
>> Best regards,
>> Yasukazu Nagatomi
>>
>


Re: Spark Thrift Server - Not Scaling Down Executors 3.4.2+

2024-09-05 Thread Cheng Pan
The default value of spark.dynamicAllocation.shuffleTracking.enabled was 
changed from false to true in Spark 3.4.0, disabling it might help.

[1] 
https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-33-to-34

Thanks,
Cheng Pan



> On Sep 6, 2024, at 00:36, Jayabindu Singh  wrote:
> 
> Dear Spark Users,
> 
> We have run into an issue where with spark 3.3.2 using auto scaling with STS 
> is working fine, but with 3.4.2 or 3.5.2 executors are being left behind and 
> not scaling down. 
> Driver makes a call to remove the executor but some (not all) executors never 
> get removed.
> 
> Has anyone else noticed this or aware of any reported issues?
> 
> Any help will be greatly appreciated.
> 
> Regards
> Jay
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about Releases and EOL

2024-08-29 Thread Mich Talebzadeh
CCed to spark dev as well

Ok this is my take

The EOL for Spark releases is determined from my experience by a
combination of factors, including:

1) Community Support, the level of community activity and contributions to
a particular release branch plays a significant role. If a branch continues
to receive active development and bug fixes, it's more likely to be
extended.
2) Usage, the number of users and organizations relying on a specific
release also influences its EOL. Widely-used releases are more likely to be
maintained for longer periods. For example, for a good while, Spark 3.x was
widely used by Cloud vendors including AWS and Google
3) Technical Feasibility, maintaining older releases can sometimes be
challenging due to changes in underlying technologies, dependencies, and
security vulnerabilities. If it becomes difficult or impractical to support
a release, its EOL may be accelerated like any other product, basically the
shelf life
4) Alignment with the underlying technologies,  Spark's EOL can also be
influenced by the EOL of the underlying technologies it relies on, such as
Hadoop, Scala, Java and Python. If a major component reaches its EOL, it
can create challenges in maintaining older Spark releases.
5) . Security patches, the availability of security patches for a
particular Spark release is another important factor. If a release
continues to receive security updates, it is more likely to be maintained
for longer.

Determining LTS Releases:

1) Official Announcements, Spark releases are usually announced on the
Apache Spark mailing lists. These announcements often indicate which
releases are designated as  Long-Term Support(LTS). I suggest you add
yourself to @Spark dev list
2) Release Notes, check the release notes for individual Spark versions.
They might contain information about a release's (LTS) status.
3) Community discussions, engage with the Spark community on forums or
mailing lists. Discussions about EOL timelines and LTS releases often take
place there.
3) Regarding Spark 2.4.8, while Idid not find a specific announcement
declaring Spark 2.4.0 as the final minor release, the fact that 2.4.8 is
still being maintained suggests it might be an LTS release. This is likely
due to its continued usage?

HTH

Mich Talebzadeh,
Architect | Data Engineer | Data Science | Financial Crime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 29 Aug 2024 at 17:17, Miles Fryhover (V)
 wrote:

> Hello,
>
> I am looking into Spark and how the EOL date is determined for a given
> release. I have been able to find that, in general, feature release
> branches are generally maintained for 18 months. However, the last minor
> release (which I assume is equivalent to feature release) within a major
> release will be maintained for longer.
>
> How are these “LTS” releases determined? Is there anywhere in the
> documentation that indicates which branches are considered LTS? The example
> uses 2.4.8 however I’m unable to find any announcement indicating that
> 2.4.0 was to be the final minor release for the major version of 2.
>
> Any help on finding these/answering this question would be helpful for us.
> Thanks!
>
> 
> Miles Fryhover
> Information Security - Assurance
> Apple
> One Apple Park Way, MS 141-1AIS
> Cupertino, CA 95014, USA
> Office 1-437-268-1679
> iPhone 1-405-615-7737
>
> This email and any attachments may be privileged and may contain
> confidential information intended only for the recipient(s) named above.
> Any other distribution, forwarding, copying or disclosure of this message
> is strictly prohibited. If you have received this email in error, please
> notify me immediately by telephone or return email, and delete this message
> from your system.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: unable to deploy Pyspark application on GKE, Spark installed using bitnami helm chart

2024-08-27 Thread Mat Schaffer
I use https://github.com/kubeflow/spark-operator rather than bitnami chart,
but https://medium.com/@kayvan.sol2/spark-on-kubernetes-d566158186c6 shows
running spark submit from a master pod exec. Might be something to try.

On Mon, Aug 26, 2024 at 12:22 PM karan alang  wrote:

> We are currently using Dataproc on GCP for running our spark workloads,
> and i'm planning to move this workload to Kubernetes(GKE).
>
> Here is what is done so far :
>
> Installed Spark using bitnami helm chart:
>
> ```
>
> helm repo add bitnami https://charts.bitnami.com/bitnami
>
> helm install spark -f sparkConfig.yaml bitnami/spark -n spark
>
> ```
>
> Also, deployed a loadbalancer, yaml used :
>
> ```
>
> apiVersion: v1kind: Servicemetadata:
>   name: spark-master-lb
>   labels:
> app: spark
> component: LoadBalancerspec:
>   selector:
> app.kubernetes.io/component: master
> app.kubernetes.io/instance: spark
> app.kubernetes.io/name: spark
>   ports:
>   - name: webui
> port: 8080
> targetPort: 8080
>   - name: master
> port: 7077
> targetPort: 7077
>   type: LoadBalancer
>
> ```
>
> Spark is installed, and the pods have come up.
>
> When i try to do a spark-submit in cluster mode, it gives following error:
>
> ```
>
> (base) Karans-MacBook-Pro:fromEdward-jan26 karanalang$ 
> $SPARK_HOME/bin/spark-submit   --master spark://:7077   
> --deploy-mode cluster   --name spark-on-gke   
> local:///Users/karanalang/Documents/Technology/0.spark-on-gke/StructuredStream-on-gke.py24/08/26
>  12:03:26 WARN Utils: Your hostname, Karans-MacBook-Pro.local resolves to a 
> loopback address: 127.0.0.1; using 10.42.28.138 instead (on interface 
> en0)24/08/26 12:03:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
> another addressWARNING: An illegal reflective access operation has 
> occurredWARNING: Illegal reflective access by 
> org.apache.spark.unsafe.Platform 
> (file:/Users/karanalang/Documents/Technology/spark-3.1.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.3.jar)
>  to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider 
> reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: 
> Use --illegal-access=warn to enable warnings of further illegal reflective 
> access operationsWARNING: All illegal access operations will be denied in a 
> future release
> Exception in thread "main" org.apache.spark.SparkException: Cluster deploy 
> mode is currently not supported for python applications on standalone 
> clusters.
> at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
> at 
> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:273)
> at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> ```
>
> In client mode, it gives the following error :
>
> 24/08/26 12:06:58 ERROR SparkContext: Error initializing SparkContext.
> java.lang.NullPointerException
> at org.apache.spark.SparkContext.(SparkContext.scala:640)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
> at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
> at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:238)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:829)24/08/26 12:06:58 INFO 
> SparkContext: SparkContext already stopped.
>
> Couple of questions :
>
>1.
>
>is using the helm chart the correct way to install Apache Spark on
>GKE/k8s (Note - need to install on both GKE and On-prem kubernetes)
>2.
>
>How to submit pyspark jobs on Spark cluster deployed on GKE (eg. Do I
>need to create a K8s deployment for each spark job ?)
>
> tia !
>
> Here is the stackoverflow link :
>
>
> https://stackoverflow.com/questions/78915988/unab

Re: Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-24 Thread Prem Sahoo
Issue resolved , thanks for your time folks.Sent from my iPhoneOn Aug 21, 2024, at 5:38 PM, Prem Sahoo  wrote:Hello Team,Could you please check on this request ?On Mon, Aug 19, 2024 at 7:00 PM Prem Sahoo  wrote:Hello Spark and User,could you please shed some light ?On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo  wrote:Hello Spark and User,we have a  Spark project which is a long running Spark session where  it does below1. We are reading from  Mapr FS and writing to MapR FS.2. Another parallel job which reads from MapR Fs and Writes to MinIO object storage.We are finding issues for a few batches of Spark jobs which one writes to MinIO , reads empty data frame/dataset from MapR but the job which reads from  & writes to  MapR Fs for the same batches never had any issue.I was just going through some blogs and stackoverflow to know that Spark Session which holds both information /config of MapR and Minio sometimes find this issue as Spark Session or context has no correct  information so either we need to clear or restart spark session for each batch.Please let me know if you have any suggestions to get rid of this issue.




Re: Batch to Kafka

2024-08-23 Thread Rommel Holmes
Hi, community

Right now i am using the batch to kafka in pyspark to send dataframe into
kafka
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka


The solution works with not so big dataframe.
While the dataframe is big, since the write to kafka is synchronize, it
doesn't batch data at all, so each call to kafka is only 1-2
records (basically 1-2 row in the dataframe), and the performance is pretty
bad. I even set kafka.batch.size to 500k, but i don't think it is working
due to synchronous producing.

I see a lot of changes happening on the structure streaming side, but batch
to kafka seems not much changes, or at least i didn't find any.

Is there anyway i can make the write to kafka asynchronous, or is there a
way to make the producer batch more data that the config i missed?

Thank you.

On Wed, Jul 31, 2024 at 2:38 PM Rommel Holmes 
wrote:

> Hi, community
>
> Right now i am using the batch to kafka in pyspark to send dataframe into
> kafka
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka
>
>
> The solution works with not so big dataframe.
> While the dataframe is big, since the write to kafka is synchronize, it
> doesn't batch data at all, so each call to kafka is only 1-2
> records (basically 1-2 row in the dataframe), and the performance is pretty
> bad. I even set kafka.batch.size to 500k, but i don't think it is working
> due to synchronous producing.
>
> I see a lot of changes happening on the structure streaming side, but
> batch to kafka seems not much changes, or at least i didn't find any.
>
> Is there anyway i can make the write to kafka asynchronous, or is there a
> way to make the producer batch more data that the config i missed?
>
> Thank you.
>
>
> --
>  Yours
>  Rommel
>
>

-- 
 Yours
 Rommel
*
  I  waited patiently for the LORD;
   he turned to me and heard my cry.
 He lifted me out of the slimy pit,
   out of the mud and mire;
he set my feet on a rock
   and gave me a firm place to stand. *


Re: Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-21 Thread Prem Sahoo
Hello Team,
Could you please check on this request ?

On Mon, Aug 19, 2024 at 7:00 PM Prem Sahoo  wrote:

> Hello Spark and User,
> could you please shed some light ?
>
> On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo  wrote:
>
>> Hello Spark and User,
>> we have a  Spark project which is a long running Spark session where  it
>> does below
>> 1. We are reading from  Mapr FS and writing to MapR FS.
>> 2. Another parallel job which reads from MapR Fs and Writes to MinIO
>> object storage.
>>
>> We are finding issues for a few batches of Spark jobs which one writes to
>> MinIO , reads empty data frame/dataset from MapR but the job which reads
>> from  & writes to  MapR Fs for the same batches never had any issue.
>>
>> I was just going through some blogs and stackoverflow to know that Spark
>> Session which holds both information /config of MapR and Minio sometimes
>> find this issue as Spark Session or context has no correct  information so
>> either we need to clear or restart spark session for each batch.
>>
>>
>> Please let me know if you have any suggestions to get rid of this issue.
>>
>>


Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Cheng Pan
I searched [1] using the keywords “reliable” and got nothing, so I cannot draw 
the same conclusion as you.

If an implementation claims to support reliable storage, it should inherit 
interface ShuffleDriverComponents and override method supportsReliableStorage 
[2] to return true, for example, Apache Celeborn [3], a Remote Shuffle Service 
for Spark.

Thanks,
Cheng Pan

[1] 
https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage
[2] 
https://github.com/apache/spark/blob/v3.5.2/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java#L65-L72
[3] 
https://github.com/apache/celeborn/blob/v0.5.1/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java#L56

> On Aug 20, 2024, at 18:42, Aaron Grubb  wrote:
> 
> Hi Cheng,
> 
> Due to the documentation [1]. This is why I suggested at the end of the 
> message you replied to that documentation should be updated or
> clarified. Can you explain how persistent volume claims in Kubernetes are 
> "unreliable" storage?
> 
> Thanks,
> Aaron
> 
> https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage
> 
> On Tue, 2024-08-20 at 18:37 +0800, Cheng Pan wrote:
>> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support 
>> reliable storage, so the condition 4) is false even with this
>> configuration.
>> I’m not sure why you think it does.
>> 
>> Thanks,
>> Cheng Pan
>> 
>>> On Aug 20, 2024, at 18:27, Aaron Grubb  wrote:
>>> 
>>> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the 
>>> job however it still was not stable in the face of spot
>>> instances
>>> going away. Adding spark.decommission.enabled=true, 
>>> spark.storage.decommission.enabled=true and
>>> spark.executor.decommission.killInterval=110
>>> appears to have completely stabilized the job (not sure which did the trick 
>>> as I added them at the same time). Perhaps extra
>>> documentation or
>>> clarifications should be added as it doesn't seem clear to me how to 
>>> arrivate at job stability using dynamic allocation without trial and
>>> error.
>>> 
>>> On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote:
 Hi all,
 
 I'm running Spark on Kubernetes on AWS using only spot instances for 
 executors with dynamic allocation enabled. This particular job is
 being
 triggered by Airflow and it hit this bug [1] 6 times in a row. However, I 
 had recently switched to using PersistentVolumeClaims in
 Spark
 with
 spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
  but kept
 spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
 under the notes for spark.dynamicAllocation.enabled [2] that
 these
 configurations are "or" not "and". However, when setting 
 spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with 
 the
 message
 
 org.apache.spark.SparkException: Dynamic allocation of executors requires 
 one of the following conditions: 1) enabling external shuffle
 service through spark.shuffle.service.enabled. 2) enabling shuffle 
 tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3)
 enabling shuffle blocks decommission through spark.decommission.enabled 
 and spark.storage.decommission.shuffleBlocks.enabled. 4)
 (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a 
 custom ShuffleDataIO who's ShuffleDriverComponents supports
 reliable
 storage.
 
 Am I hitting this bug unavoidably? Or is there a configuration I'm missing 
 to enable
 spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
  to replace
 spark.dynamicAllocation.shuffleTracking.enabled=true?
 
 Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
 
 spark.checkpoint.compress  
 true
 spark.driver.cores 
   1
 spark.driver.maxResultSize 
 2g
 spark.driver.memory
 5140m
 spark.dynamicAllocation.enabled
 true
 spark.dynamicAllocation.executorAllocationRatio
 0.33
 spark.dynamicAllocation.maxExecutors   
 20
 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout   
 30
 spark.eventLog.enabled 
 true
 spark.executor.cores   

Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Cheng Pan
org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support 
reliable storage, so the condition 4) is false even with this configuration.
I’m not sure why you think it does.

Thanks,
Cheng Pan

> On Aug 20, 2024, at 18:27, Aaron Grubb  wrote:
> 
> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job 
> however it still was not stable in the face of spot instances
> going away. Adding spark.decommission.enabled=true, 
> spark.storage.decommission.enabled=true and 
> spark.executor.decommission.killInterval=110
> appears to have completely stabilized the job (not sure which did the trick 
> as I added them at the same time). Perhaps extra documentation or
> clarifications should be added as it doesn't seem clear to me how to arrivate 
> at job stability using dynamic allocation without trial and
> error.
> 
> On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote:
>> Hi all,
>> 
>> I'm running Spark on Kubernetes on AWS using only spot instances for 
>> executors with dynamic allocation enabled. This particular job is
>> being
>> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I 
>> had recently switched to using PersistentVolumeClaims in Spark
>> with
>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>  but kept
>> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
>> under the notes for spark.dynamicAllocation.enabled [2] that these
>> configurations are "or" not "and". However, when setting 
>> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with 
>> the
>> message
>> 
>> org.apache.spark.SparkException: Dynamic allocation of executors requires 
>> one of the following conditions: 1) enabling external shuffle
>> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
>> through spark.dynamicAllocation.shuffleTracking.enabled. 3)
>> enabling shuffle blocks decommission through spark.decommission.enabled and 
>> spark.storage.decommission.shuffleBlocks.enabled. 4)
>> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a 
>> custom ShuffleDataIO who's ShuffleDriverComponents supports reliable
>> storage.
>> 
>> Am I hitting this bug unavoidably? Or is there a configuration I'm missing 
>> to enable
>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>  to replace
>> spark.dynamicAllocation.shuffleTracking.enabled=true?
>> 
>> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>> 
>> spark.checkpoint.compress
>>   true
>> spark.driver.cores   
>> 1
>> spark.driver.maxResultSize   
>>   2g
>> spark.driver.memory  
>>   5140m
>> spark.dynamicAllocation.enabled  
>>   true
>> spark.dynamicAllocation.executorAllocationRatio  
>>   0.33
>> spark.dynamicAllocation.maxExecutors 
>>   20
>> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 
>>   30
>> spark.eventLog.enabled   
>>   true
>> spark.executor.cores 
>> 3
>> spark.executor.logs.rolling.enableCompression
>>   true
>> spark.executor.logs.rolling.maxRetainedFiles 
>>   48
>> spark.executor.logs.rolling.strategy 
>>   time
>> spark.executor.logs.rolling.time.interval
>>   hourly
>> spark.hadoop.fs.s3a.impl 
>>   org.apache.hadoop.fs.s3a.S3AFileSystem
>> spark.hadoop.fs.s3a.connection.ssl.enabled   
>>   false
>> spark.hadoop.fs.s3a.fast.upload  
>>   true
>> spark.kryo.registrationRequired  
>>   false
>> spark.kryo.unsafe
>>   false
>> spark.kryoserializer.buffer  
>>   1m
>> spark.kryoserializer.buffer.max  
>>   1g
>> spark.kubernetes.driver.limit.cores  
>>   750m
>> spark.kubernetes

Re: Hitting SPARK-45858 on Kubernetes - Unavoidable bug or misconfiguration?

2024-08-20 Thread Aaron Grubb
Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the job 
however it still was not stable in the face of spot instances
going away. Adding spark.decommission.enabled=true, 
spark.storage.decommission.enabled=true and 
spark.executor.decommission.killInterval=110
appears to have completely stabilized the job (not sure which did the trick as 
I added them at the same time). Perhaps extra documentation or
clarifications should be added as it doesn't seem clear to me how to arrivate 
at job stability using dynamic allocation without trial and
error.

On Mon, 2024-08-19 at 13:01 +, Aaron Grubb wrote:
> Hi all,
>
> I'm running Spark on Kubernetes on AWS using only spot instances for 
> executors with dynamic allocation enabled. This particular job is
> being
> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I had 
> recently switched to using PersistentVolumeClaims in Spark
> with
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  but kept
> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
> under the notes for spark.dynamicAllocation.enabled [2] that these
> configurations are "or" not "and". However, when setting 
> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with the
> message
>
> org.apache.spark.SparkException: Dynamic allocation of executors requires one 
> of the following conditions: 1) enabling external shuffle
> service through spark.shuffle.service.enabled. 2) enabling shuffle tracking 
> through spark.dynamicAllocation.shuffleTracking.enabled. 3)
> enabling shuffle blocks decommission through spark.decommission.enabled and 
> spark.storage.decommission.shuffleBlocks.enabled. 4)
> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a custom 
> ShuffleDataIO who's ShuffleDriverComponents supports reliable
> storage.
>
> Am I hitting this bug unavoidably? Or is there a configuration I'm missing to 
> enable
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>  to replace
> spark.dynamicAllocation.shuffleTracking.enabled=true?
>
> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>
> spark.checkpoint.compress 
>  true
> spark.driver.cores
>1
> spark.driver.maxResultSize
>  2g
> spark.driver.memory   
>  5140m
> spark.dynamicAllocation.enabled   
>  true
> spark.dynamicAllocation.executorAllocationRatio   
>  0.33
> spark.dynamicAllocation.maxExecutors  
>  20
> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout  
>  30
> spark.eventLog.enabled
>  true
> spark.executor.cores  
>3
> spark.executor.logs.rolling.enableCompression 
>  true
> spark.executor.logs.rolling.maxRetainedFiles  
>  48
> spark.executor.logs.rolling.strategy  
>  time
> spark.executor.logs.rolling.time.interval 
>  hourly
> spark.hadoop.fs.s3a.impl  
>  org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.connection.ssl.enabled
>  false
> spark.hadoop.fs.s3a.fast.upload   
>  true
> spark.kryo.registrationRequired   
>  false
> spark.kryo.unsafe 
>  false
> spark.kryoserializer.buffer   
>  1m
> spark.kryoserializer.buffer.max   
>  1g
> spark.kubernetes.driver.limit.cores   
>  750m
> spark.kubernetes.driver.ownPersistentVolumeClaim  
>  true
> spark.kubernetes.driver.request.cores 
>  750m
> spark.kubernetes.driver.reusePersistentVolumeClaim
>  true
> spark.kubernetes.driver.waitToReusePersistentVolumeClaim  

Re: Handling load distribution and addressing data skew.

2024-08-19 Thread Raghavendra Ganesh
Hi,
Have you tried
https://spark.apache.org/docs/latest/sql-performance-tuning.html#spliting-skewed-shuffle-partitions
?


Another way of handling the skew is to split the task into multiple(2 or
more) stages involving a random salt as key in the intermediate stages.

In the above case,


val maxSalt = 20 // in my experience 2*sqrt(#numPartitions) works good
for 2 stages
val randUdf = udf({ () => Random.nextInt() % maxSalt })
df.withColumn("salt", randUdf())
  .repartition(numPartitions, col("salt"),col("key1")...)
  .drop("salt")
  .sortWithinPartitions(col("key1")...)
  .yourDfOperation(...)
  .répartition(numPartitions, col("key1")...)
  .sortWithinPartitions(col("key1")...)
  .yourDfOperation(...)

Another optimization you could try is to perform your dataframe operation
before the initial repartition, if you have a good number of fairly random
partitions.


Raghavendra


On Sat, Aug 17, 2024 at 12:33 PM Karthick 
wrote:

> Hi Team,
>
> I'm using repartition and sortWithinPartitions to maintain field-based
> ordering across partitions, but I'm facing data skewness among the
> partitions. I have 96 partitions, and I'm working with 500 distinct keys.
> While reviewing the Spark UI, I noticed that a few partitions are
> underutilized while others are overutilized.
>
> This seems to be a hashing problem. Can anyone suggest a better hashing
> technique or approach to mitigate this issue?
>
> Thanks in advance for your help.
>


Re: Spark Reads from MapR and Write to MinIO fails for few batches

2024-08-19 Thread Prem Sahoo
Hello Spark and User,
could you please shed some light ?

On Thu, Aug 15, 2024 at 7:15 PM Prem Sahoo  wrote:

> Hello Spark and User,
> we have a  Spark project which is a long running Spark session where  it
> does below
> 1. We are reading from  Mapr FS and writing to MapR FS.
> 2. Another parallel job which reads from MapR Fs and Writes to MinIO
> object storage.
>
> We are finding issues for a few batches of Spark jobs which one writes to
> MinIO , reads empty data frame/dataset from MapR but the job which reads
> from  & writes to  MapR Fs for the same batches never had any issue.
>
> I was just going through some blogs and stackoverflow to know that Spark
> Session which holds both information /config of MapR and Minio sometimes
> find this issue as Spark Session or context has no correct  information so
> either we need to clear or restart spark session for each batch.
>
>
> Please let me know if you have any suggestions to get rid of this issue.
>
>


Re: [External] Re: Redundant(?) shuffle after join

2024-08-19 Thread Ofir Manor
Shay - if I understand your question, you want to know if Spark has an 
optimization to eliminate shuffle from window functions in those conditions 
(when the window function partition key is equal to the bucket key,  after a 
join), and if so, why it does not apply...

Have you tried simpler variations just for debug? Are their shuffle eliminated?

  1.
Same as your query but without the join (just reading large bucketized table 
and applying the window function)
  2.
Same as your query but replace the window function -  row_number() over 
(partition by key_col order by sort col) - with SELECT key_col, min(sort_col) 
... GROUP BY key_col

Might be a case where an optimization is possible but not implemented 🙁

Also - are your buckets sorted? by key_col + sort_key?  If so, why does Spark 
need to sort for ranking window function at all? (same for the two sorts for 
the join)
Maybe experiment a bit with spark.sql.legacy.bucketedTableScan.outputOrdering 
(again, forcing Spark to eliminate that sort since data on disk is sorted 
should eliminate the window sort - if that was implemented)
Not sure though why that parameter marked as legacy and why, no maybe missing 
something..
Just my two cents,
   Ofir


From: Mich Talebzadeh 
Sent: Friday, August 16, 2024 7:54 PM
To: Shay Elbaz 
Cc: Shay Elbaz ; user@spark.apache.org 

Subject: [External] Re: Redundant(?) shuffle after join

Hi Shay,

Let me address the points you raised using the STAR methodology. I apologize if 
it sounds a bit formal, but I find it  effective for clarity.

Situation
You encountered an issue while working with a Spark DataFrame where a shuffle 
was unexpectedly triggered during the application of a window function. This 
happened even though the data was already partitioned and sorted by the key 
column (`key_col`). Specifically, the issue arose after joining a large, 
bucketed table with a smaller DataFrame on the same column used in the window 
function.

Task:
Your objective, as evident from your question, was to understand why Spark 
introduced a shuffle for the window function despite the data being 
pre-partitioned and sorted. In summary, we needed to identify the underlying 
cause of this behavior and explore possible solutions to prevent the 
unnecessary shuffle.

Action:
To investigate the issue and provide a reasonable explanation, I considered 
several possibilities:

1. Partitioning Requirements:
   I mentioned the possibility that Spark introduced the shuffle to meet its 
internal partitioning requirements for the window function. Although the data 
was already partitioned by `key_col`, Spark might still trigger a shuffle to 
ensure that the data distribution aligns perfectly with the window function's 
needs.

2. Locality and Ordering:
   I considered that Spark might have required a shuffle to enforce global 
sorting within partitions. Even though the data was locally sorted within each 
bucket, Spark could still introduce a shuffle to ensure the window function 
operates correctly across all partitions.

3. Adaptive Query Execution (AQE):
   You inquired whether AQE might have introduced the shuffle to optimize 
performance based on runtime statistics. This is indeed a possibility, as AQE 
can adjust the execution plan dynamically.

4. Compatibility and Partitioning Mismatch:
   There may be a mismatch in partitioning recognition between the join 
operation and the window function. This mismatch could lead Spark to introduce 
a shuffle, even when using the same `key_col`.

Recommendations:
To address these potential causes, I recommend the following steps:

- Check Spark's Understanding of Partitioning: Inspect the DataFrame’s 
partitioning after the join operation to ensure it aligns with expectations.
- Disable AQE Temporarily: Turn off AQE to determine if it was influencing the 
shuffle.
- Force Specific Partitioning: Repartition the DataFrame explicitly by key_co` 
before applying the window function to see if this prevents the shuffle.

HTH


Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College 
London<https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 16 Aug 2024 at 14:55, Shay Elbaz 
mailto

Re: Redundant(?) shuffle after join

2024-08-16 Thread Mich Talebzadeh
Hi Shay,

Let me address the points you raised using the STAR methodology. I
apologize if it sounds a bit formal, but I find it  effective for clarity.

*Situation*
You encountered an issue while working with a Spark DataFrame where a
shuffle was unexpectedly triggered during the application of a window
function. This happened even though the data was already partitioned and
sorted by the key column (`key_col`). Specifically, the issue arose after
joining a large, bucketed table with a smaller DataFrame on the same column
used in the window function.

*Task:*
Your objective, as evident from your question, was to understand why Spark
introduced a shuffle for the window function despite the data being
pre-partitioned and sorted. In summary, we needed to identify the
underlying cause of this behavior and explore possible solutions to prevent
the unnecessary shuffle.

*Action:*
To investigate the issue and provide a reasonable explanation, I considered
several possibilities:

1. Partitioning Requirements:
   I mentioned the possibility that Spark introduced the shuffle to meet
its internal partitioning requirements for the window function. Although
the data was already partitioned by `key_col`, Spark might still trigger a
shuffle to ensure that the data distribution aligns perfectly with the
window function's needs.

2. Locality and Ordering:
   I considered that Spark might have required a shuffle to enforce global
sorting within partitions. Even though the data was locally sorted within
each bucket, Spark could still introduce a shuffle to ensure the window
function operates correctly across all partitions.

3. Adaptive Query Execution (AQE):
   You inquired whether AQE might have introduced the shuffle to optimize
performance based on runtime statistics. This is indeed a possibility, as
AQE can adjust the execution plan dynamically.

4. Compatibility and Partitioning Mismatch:
   There may be a mismatch in partitioning recognition between the join
operation and the window function. This mismatch could lead Spark to
introduce a shuffle, even when using the same `key_col`.

*Recommendations:*
To address these potential causes, I recommend the following steps:

- Check Spark's Understanding of Partitioning: Inspect the DataFrame’s
partitioning after the join operation to ensure it aligns with expectations.
- Disable AQE Temporarily: Turn off AQE to determine if it was influencing
the shuffle.
- Force Specific Partitioning: Repartition the DataFrame explicitly by
key_co` before applying the window function to see if this prevents the
shuffle.

HTH


Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 16 Aug 2024 at 14:55, Shay Elbaz  wrote:

> Hi Mich, thank you for answering - much appreciated.
>
> This can cause uneven distribution of data, triggering a shuffle for the
> window function.
>
> Could you elaborate on the mechanism that can "trigger a shuffle for the
> window function"? I'm not familiar with it. (or are you referring to AQE?)
> In any case, there is no skew - the keys are GUIDs of events. Even if the
> data was skewed, the shuffle would end up exactly the same way as before
> the shuffle - the DF was already partitioned (and locally sorted) by the
> same key.
>
> Thanks again,
>
> Shay
>
>
>
> --
> *From:* Mich Talebzadeh 
> *Sent:* Thursday, August 15, 2024 17:21
> *To:* Shay Elbaz 
> *Cc:* user@spark.apache.org 
> *Subject:* Re: Redundant(?) shuffle after join
>
> This message contains hyperlinks, take precaution before opening these
> links.
> The actual code is not given, so I am going with the plan output and your
> explanation
>
>
>- You're joining a large, bucketed table with a smaller DataFrame on a
>common column (key_col).
>- The subsequent window function also uses key_col
>- However, a shuffle occurs for the window function even though the
>data is already partitioned by key_col
>
>
> Potential data skew, Though
>  the table is bucketed, there might be significant data skew within the
> buckets. This can cause uneven distribution of data, triggering a shuffle
> for the w

Re: Redundant(?) shuffle after join

2024-08-16 Thread Shay Elbaz
Hi Mich, thank you for answering - much appreciated.

This can cause uneven distribution of data, triggering a shuffle for the window 
function.
Could you elaborate on the mechanism that can "trigger a shuffle for the window 
function"? I'm not familiar with it. (or are you referring to AQE?)
In any case, there is no skew - the keys are GUIDs of events. Even if the data 
was skewed, the shuffle would end up exactly the same way as before the shuffle 
- the DF was already partitioned (and locally sorted) by the same key.

Thanks again,

Shay




From: Mich Talebzadeh 
Sent: Thursday, August 15, 2024 17:21
To: Shay Elbaz 
Cc: user@spark.apache.org 
Subject: Re: Redundant(?) shuffle after join

This message contains hyperlinks, take precaution before opening these links.
The actual code is not given, so I am going with the plan output and your 
explanation


  *   You're joining a large, bucketed table with a smaller DataFrame on a 
common column (key_col).
  *   The subsequent window function also uses key_col
  *   However, a shuffle occurs for the window function even though the data is 
already partitioned by key_col


Potential data skew, Though

 the table is bucketed, there might be significant data skew within the 
buckets. This can cause uneven distribution of data, triggering a shuffle for 
the window function.

import pyspark.sql.functions as F
df = spark.table("your_bucketed_table")
df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()


HTH
Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College 
London<https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 15 Aug 2024 at 14:30, Shay Elbaz  wrote:
Hi Spark community,

Please review the cleansed plan below. It is the result of joining a large, 
bucketed table with a smaller DF, and then applying a window function. Both the 
join and the window function use the same column, which is also the bucket 
column of the table ("key_col" in the plan).
The join results in a map-side-join as expected, but then there is a shuffle 
for the window function, even though the data is already partitioned 
accordingly.

Can anyone explain why?

Using Spark 3.5.0


Thanks,
Shay


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project ...
   +- Filter (rn#5441 = 1)
  +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
 +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], 
row_number(), 1, Final
+- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], 
false, 0
   +- Exchange hashpartitioning(key_col#5394, 8), 
ENSURE_REQUIREMENTS, [plan_id=592]
  +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS 
FIRST], row_number(), 1, Partial
 +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS 
FIRST], false, 0
+- Project ...  (key_col stays the same)
   +- Project [coalesce(key_col#0, key_col#5009) AS 
key_col#5394, CASE WHEN ...
  +- SortMergeJoin [key_col#0], [key_col#5009], 
FullOuter
 :- Sort [key_col#0 ASC NULLS FIRST], false, 0
 :  +- Project key_
 : +- FileScan parquet bucketed table ...
 +- Sort [key_col#5009 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key_col#5009, 
8), REPARTITION_BY_NUM, [plan_id=572]
   +- Project
  +- Filter
 +- Scan small table...



Re: Redundant(?) shuffle after join

2024-08-15 Thread Mich Talebzadeh
The actual code is not given, so I am going with the plan output and your
explanation


   - You're joining a large, bucketed table with a smaller DataFrame on a
   common column (key_col).
   - The subsequent window function also uses key_col
   - However, a shuffle occurs for the window function even though the data
   is already partitioned by key_col


Potential data skew, Though
 the table is bucketed, there might be significant data skew within the
buckets. This can cause uneven distribution of data, triggering a shuffle
for the window function.

import pyspark.sql.functions as F
df = spark.table("your_bucketed_table")
df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()


HTH
Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 15 Aug 2024 at 14:30, Shay Elbaz  wrote:

> Hi Spark community,
>
> Please review the cleansed plan below. It is the result of joining a
> large, bucketed table with a smaller DF, and then applying a window
> function. Both the join and the window function use the same column, which
> is also the bucket column of the table ("key_col" in the plan).
> The join results in a map-side-join as expected, but then there is a
> shuffle for the window function, even though the data is already
> partitioned accordingly.
>
> Can anyone explain why?
>
> Using Spark 3.5.0
>
>
> Thanks,
> Shay
>
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project ...
>+- Filter (rn#5441 = 1)
>   +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC 
> NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
> currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
>  +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], 
> row_number(), 1, Final
> +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], 
> false, 0
>+- Exchange hashpartitioning(key_col#5394, 8), 
> ENSURE_REQUIREMENTS, [plan_id=592]
>   +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS 
> FIRST], row_number(), 1, Partial
>  +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC 
> NULLS FIRST], false, 0
> +- Project ...  (key_col stays the same)
>+- Project [coalesce(key_col#0, key_col#5009) AS 
> key_col#5394, CASE WHEN ...
>   +- SortMergeJoin [key_col#0], [key_col#5009], 
> FullOuter
>  :- Sort [key_col#0 ASC NULLS FIRST], false, 0
>  :  +- Project key_
>  : +- FileScan parquet bucketed table ...
>  +- Sort [key_col#5009 ASC NULLS FIRST], 
> false, 0
> +- Exchange 
> hashpartitioning(key_col#5009, 8), REPARTITION_BY_NUM, [plan_id=572]
>+- Project
>   +- Filter
>  +- Scan small table...
>
>
>


Re: Need help understanding tuning docs

2024-08-14 Thread Subhasis Mukherjee
You are mixing up storage and execution memory.

Following is the sequence of storage retention/eviction.

- Execution and storage share a unified region (M).
- When no spark execution is underway, storage activity can take up the whole 
of M. This is vice versa for execution activity.
- When both spark execution and storage is underway, there is a priority 
sequence in terms of claiming regions of M that comes into play.
- When spark execution starts, if a portion of M is already occupied by storage 
but is now needed for execution, execution starts evicting storage to reclaim 
the space.
- But this eviction can't happen to reclaim the whole of M for execution. There 
is a certain reserved threshold R (subset of M) till which this eviction of 
storage by execution can take place. If the execution tries to evict more than 
R, it is stopped.
- In short, R is that subregion of M where storage will always have more 
priority than execution and will never be released to execution.



Regards,
Subhasis Mukherjee

From: Sreyan Chakravarty 
Sent: Wednesday, August 14, 2024 9:00:45 PM
To: user@spark.apache.org 
Subject: Need help understanding tuning docs

https://spark.apache.org/docs/latest/tuning.html#memory-management-overview

What is the meaning of :
"Execution may evict storage if necessary, but only until total storage memory 
usage falls under a certain threshold (R). In other words, R describes a 
subregion within M where cached blocks are never evicted. "

This seems contradictory, in simple terms I find the meaning that once total 
memory usage crosses a threshold(R) Spark will start evicting storage in a LRU 
fashion.

But the line:

"In other words, R describes a subregion within M where cached blocks are never 
evicted."

Seems contradictory, what is going on?

--
Regards,
Sreyan Chakravarty


Re: [ANNOUNCE] Apache Spark 3.5.2 released

2024-08-12 Thread Xiao Li
Thank you, Kent!

Kent Yao  于2024年8月12日周一 08:03写道:

> We are happy to announce the availability of Apache Spark 3.5.2!
>
> Spark 3.5.2 is the second maintenance release containing security
> and correctness fixes. This release is based on the branch-3.5
> maintenance branch of Spark. We strongly recommend all 3.5 users
> to upgrade to this stable release.
>
> To download Spark 3.5.2, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-5-2.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
> Kent Yao
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: dynamically infer json data not working as expected

2024-08-08 Thread Perez
Also, I checked your code but it will again give the same result even if I
do sampling because the schema of the "data" attribute is not fixed.

Any suggestions?


On Thu, Aug 8, 2024 at 12:34 PM Perez  wrote:

> Hi Mich,
>
> Thanks a lot for your answer but there is one more scenario to it.
>
> The schema of the data attribute inside the steps column is not fixed. For
> some records, I see it as a struct and for others, I see it as an Array of
> objects.
>
> So at last it treats it as string only since it gets confused I guess.
>
> So how can we tackle this?
>
> Thanks,
>


Re: dynamically infer json data not working as expected

2024-08-08 Thread Perez
Hi Mich,

Thanks a lot for your answer but there is one more scenario to it.

The schema of the data attribute inside the steps column is not fixed. For
some records, I see it as a struct and for others, I see it as an Array of
objects.

So at last it treats it as string only since it gets confused I guess.

So how can we tackle this?

Thanks,


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Prabodh Agarwal
Glad to help!

On Tue, 6 Aug, 2024, 17:37 Ilango,  wrote:

>
> Thanks Praboth. Passing —master attr in spark connect command worked like
> charm.  I am able to submit spark connect to my existing stand-alone cluster
>
> Thanks for saving my day once again :)
>
> Thanks,
> Elango
>
>
> On Tue, 6 Aug 2024 at 6:08 PM, Prabodh Agarwal 
> wrote:
>
>> Do you get some error on passing the master option to your spark connect
>> command?
>>
>> On Tue, 6 Aug, 2024, 15:36 Ilango,  wrote:
>>
>>>
>>>
>>>
>>> Thanks Prabodh. I'm having an issue with the Spark Connect connection as
>>> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
>>> the actual master node for our Spark standalone cluster is different. I am
>>> passing that master node ip in the Spark Connect Connection. But still it
>>> is not set correctly. Could you please help me update this configuration to
>>> reflect the correct master node value?
>>>
>>>
>>>
>>> This is my spark connect connection
>>>
>>>
>>>
>>> spark = SparkSession.builder\
>>>
>>> .remote("sc://:15002")\
>>>
>>> .getOrCreate()
>>>
>>>
>>> Thanks,
>>> Elango
>>>
>>>
>>> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
>>> wrote:
>>>
 There is an executors tab on spark connect. It's contents are generally
 similar to the workers section of the spark master ui.

 You might need to specify --master option in your spark connect command
 if you haven't done so yet.

 On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:

>
> Hi all,
>
> I am evaluating the use of Spark Connect with my Spark stand-alone
> cluster, which has a master node and 3 worker nodes. I have successfully
> created a Spark Connect connection. However, when submitting Spark SQL
> queries, the jobs are being executed only on the master node, and I do not
> observe any executors running on the worker nodes, despite requesting 4
> executors.
>
>
>
> I would appreciate clarification on whether Spark stand-alone cluster
> is supported for use with Spark Connect.
>
> If so, how can I leverage the existing Spark stand-alone cluster's
> worker nodes?
>
>
>
>
>
>
> Thanks,
> Elango
>



Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Ilango
Thanks Praboth. Passing —master attr in spark connect command worked like
charm.  I am able to submit spark connect to my existing stand-alone cluster

Thanks for saving my day once again :)

Thanks,
Elango


On Tue, 6 Aug 2024 at 6:08 PM, Prabodh Agarwal 
wrote:

> Do you get some error on passing the master option to your spark connect
> command?
>
> On Tue, 6 Aug, 2024, 15:36 Ilango,  wrote:
>
>>
>>
>>
>> Thanks Prabodh. I'm having an issue with the Spark Connect connection as
>> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
>> the actual master node for our Spark standalone cluster is different. I am
>> passing that master node ip in the Spark Connect Connection. But still it
>> is not set correctly. Could you please help me update this configuration to
>> reflect the correct master node value?
>>
>>
>>
>> This is my spark connect connection
>>
>>
>>
>> spark = SparkSession.builder\
>>
>> .remote("sc://:15002")\
>>
>> .getOrCreate()
>>
>>
>> Thanks,
>> Elango
>>
>>
>> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
>> wrote:
>>
>>> There is an executors tab on spark connect. It's contents are generally
>>> similar to the workers section of the spark master ui.
>>>
>>> You might need to specify --master option in your spark connect command
>>> if you haven't done so yet.
>>>
>>> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>>>

 Hi all,

 I am evaluating the use of Spark Connect with my Spark stand-alone
 cluster, which has a master node and 3 worker nodes. I have successfully
 created a Spark Connect connection. However, when submitting Spark SQL
 queries, the jobs are being executed only on the master node, and I do not
 observe any executors running on the worker nodes, despite requesting 4
 executors.



 I would appreciate clarification on whether Spark stand-alone cluster
 is supported for use with Spark Connect.

 If so, how can I leverage the existing Spark stand-alone cluster's
 worker nodes?






 Thanks,
 Elango

>>>


Re: Spark 3.5.0 bug - Writing a small paraquet dataframe to storage using spark 3.5.0 taking too long

2024-08-06 Thread Bijoy Deb
Hi Spark community,

Any resolution would be highly appreciated.

Few additional analysis from my side:

The lag in writing parquet exists in spark 3.5.0, but no lag in spark 3.1.2
or 2.4.5.

Also, I found out that the task WholeStageCodeGen(1) --> ColumnarToRow is
the one which is taking the most time (almost 3 mins for a simple 3 mb
file) in spark 3.5.0. Input batch size of this stage is 10,and output
record count is 30,000. The same CoumnarToRow task in spark 3.1.2
finishes in 10 secs.
Further, with spark 3.5.0 if I cache the dataframe and materialise it using
df.count() and then write the df into parquet file, then the ColumnarToRow
gets called twice, first takes 10 secs and second one 3 mins.

On Wed, 31 Jul, 2024, 10:14 PM Bijoy Deb,  wrote:

> Hi,
>
> We are using Spark on-premise to simply read a parquet file from
> GCS(Google Cloud storage) into the DataFrame and write the DataFrame into
> another folder in parquet format in GCS, using below code:
>
> 
>
> DFS_BLOCKSIZE = 512 * 1024 * 1024
>
>
> spark = SparkSession.builder \
> .appName("test_app_parquet_load") \
>  .config("spark.master", "spark://spark-master-svc:7077") \
> .config("spark.driver.maxResultSize", '1g') \
> .config("spark.driver.memory", '1g') \
>  .config("spark.executor.cores",4) \
> .config("spark.sql.shuffle.partitions", 16) \
>.config("spark.sql.files.maxPartitionBytes", DFS_BLOCKSIZE) \
>
>  .getOrCreate()
>
>
> folder="gs://input_folder/input1/key=20240610"
> print(f"reading parquet from {folder}")
>
> start_time1 = time.time()
>
> data_df = spark.read.parquet(folder)
>
> end_time1 = time.time()
> print(f"Time duration for reading parquet t1: {end_time1 - start_time1}")
>
>
> start_time2 = time.time()
>
> data_df.write.mode("overwrite").parquet("gs://output_folder/output/key=20240610")
>
> end_time2 = time.time()
> print(f"Time duration for writing parquet t3: {end_time2 - start_time2}")
>
> spark.stop()
>
>
>
>
>
>
>
>
> __
>
>
> However, we observed a drastic time difference between Spark 2.4.5 and
> 3.5.0 in the writing process.Even in case of local filesystem instead of
> gcs, spark 3.5.0 is taking long time.
>
> In Spark 2.4.5, the above code takes about 10 seconds for Parquet to read
> and 20 seconds for write, while in Spark 3.5.0 read takes almost similar
> time but write takes nearly 3 minutes. The size of the file is just 3 MB.
> Further, we have noticed that if we read a CSV file instead of parquet into
> DataFrame and write to another folder in parquet format, Spark 3.5.0 takes
> relatively less time to write, about 30-40 seconds.
>
> So, it looks like only reading a parquet file to a dataframe and writing
> that dataframe to another parquet file is taking too long in the case of
> Spark 3.5.0.
>
> We are seeing that there is no slowness even with Spark 3.1.2. So, it
> seems that the issue with spark job taking too long to write a parquet
> based dataframe into another parquet file (in gcs or local filesystem both)
> is specific to spark 3.5.0. Looks to be either a potential bug in Spark
> 3.5.0 or some parquet related configuration that is not clearly documented.
> Any help in this regard would be highly appreciated.
>
>
> Thanks,
>
> Bijoy
>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Prabodh Agarwal
Do you get some error on passing the master option to your spark connect
command?

On Tue, 6 Aug, 2024, 15:36 Ilango,  wrote:

>
>
>
> Thanks Prabodh. I'm having an issue with the Spark Connect connection as
> the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
> the actual master node for our Spark standalone cluster is different. I am
> passing that master node ip in the Spark Connect Connection. But still it
> is not set correctly. Could you please help me update this configuration to
> reflect the correct master node value?
>
>
>
> This is my spark connect connection
>
>
>
> spark = SparkSession.builder\
>
> .remote("sc://:15002")\
>
> .getOrCreate()
>
>
> Thanks,
> Elango
>
>
> On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
> wrote:
>
>> There is an executors tab on spark connect. It's contents are generally
>> similar to the workers section of the spark master ui.
>>
>> You might need to specify --master option in your spark connect command
>> if you haven't done so yet.
>>
>> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>>
>>>
>>> Hi all,
>>>
>>> I am evaluating the use of Spark Connect with my Spark stand-alone
>>> cluster, which has a master node and 3 worker nodes. I have successfully
>>> created a Spark Connect connection. However, when submitting Spark SQL
>>> queries, the jobs are being executed only on the master node, and I do not
>>> observe any executors running on the worker nodes, despite requesting 4
>>> executors.
>>>
>>>
>>>
>>> I would appreciate clarification on whether Spark stand-alone cluster is
>>> supported for use with Spark Connect.
>>>
>>> If so, how can I leverage the existing Spark stand-alone cluster's
>>> worker nodes?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Elango
>>>
>>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Ilango
Thanks Prabodh. I'm having an issue with the Spark Connect connection as
the `spark.master` value is set to `local[*]` in Spark Connect UI, whereas
the actual master node for our Spark standalone cluster is different. I am
passing that master node ip in the Spark Connect Connection. But still it
is not set correctly. Could you please help me update this configuration to
reflect the correct master node value?



This is my spark connect connection



spark = SparkSession.builder\

.remote("sc://:15002")\

.getOrCreate()


Thanks,
Elango


On Tue, 6 Aug 2024 at 5:45 PM, Prabodh Agarwal 
wrote:

> There is an executors tab on spark connect. It's contents are generally
> similar to the workers section of the spark master ui.
>
> You might need to specify --master option in your spark connect command if
> you haven't done so yet.
>
> On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:
>
>>
>> Hi all,
>>
>> I am evaluating the use of Spark Connect with my Spark stand-alone
>> cluster, which has a master node and 3 worker nodes. I have successfully
>> created a Spark Connect connection. However, when submitting Spark SQL
>> queries, the jobs are being executed only on the master node, and I do not
>> observe any executors running on the worker nodes, despite requesting 4
>> executors.
>>
>>
>>
>> I would appreciate clarification on whether Spark stand-alone cluster is
>> supported for use with Spark Connect.
>>
>> If so, how can I leverage the existing Spark stand-alone cluster's worker
>> nodes?
>>
>>
>>
>>
>>
>>
>> Thanks,
>> Elango
>>
>


Re: [spark connect] unable to utilize stand alone cluster

2024-08-06 Thread Prabodh Agarwal
There is an executors tab on spark connect. It's contents are generally
similar to the workers section of the spark master ui.

You might need to specify --master option in your spark connect command if
you haven't done so yet.

On Tue, 6 Aug, 2024, 14:19 Ilango,  wrote:

>
> Hi all,
>
> I am evaluating the use of Spark Connect with my Spark stand-alone
> cluster, which has a master node and 3 worker nodes. I have successfully
> created a Spark Connect connection. However, when submitting Spark SQL
> queries, the jobs are being executed only on the master node, and I do not
> observe any executors running on the worker nodes, despite requesting 4
> executors.
>
>
>
> I would appreciate clarification on whether Spark stand-alone cluster is
> supported for use with Spark Connect.
>
> If so, how can I leverage the existing Spark stand-alone cluster's worker
> nodes?
>
>
>
>
>
>
> Thanks,
> Elango
>


Re: dynamically infer json data not working as expected

2024-08-05 Thread Mich Talebzadeh
I gave an answer in SO

HTH

Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 5 Aug 2024 at 22:53, Perez  wrote:

> Hello everyone,
>
> I have described my problem on the SO blog :
>
>


Re: dynamically infer json data not working as expected

2024-08-05 Thread Perez
https://stackoverflow.com/questions/78835509/dynamically-infer-schema-of-json-data-using-pyspark

Any help would be appreciated.

Thanks,

On Mon, Aug 5, 2024 at 10:35 PM Perez  wrote:

> Hello everyone,
>
> I have described my problem on the SO blog :
>
>


Re: [Issue] Spark SQL - broadcast failure

2024-08-01 Thread Sudharshan V
Hi all,
Do we have any idea on this.

Thanks

On Tue, 23 Jul, 2024, 12:54 pm Sudharshan V, 
wrote:

> We removed the explicit broadcast for that particular table and it took
> longer time since the join type changed from BHJ to SMJ.
>
> I wanted to understand how I can find what went wrong with the broadcast
> now.
> How do I know the size of the table inside of spark memory.
>
> I have tried to cache the table hoping I could see the table size in the
> storage tab of spark UI of EMR.
>
> But I see no data there .
>
> Thanks
>
> On Tue, 23 Jul, 2024, 12:48 pm Sudharshan V, 
> wrote:
>
>> Hi all, apologies for the delayed response.
>>
>> We are using spark version 3.4.1 in jar and EMR 6.11 runtime.
>>
>> We have disabled the auto broadcast always and would broadcast the
>> smaller tables using explicit broadcast.
>>
>> It was working fine historically and only now it is failing.
>>
>> The data sizes I mentioned was taken from S3.
>>
>> Thanks,
>> Sudharshan
>>
>> On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, 
>> wrote:
>>
>>> Can you try disabling broadcast join and see what happens?
>>>
>>> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
>>> wrote:
>>>
 Hi all,

 Been facing a weird issue lately.
 In our production code base , we have an explicit broadcast for a small
 table.
 It is just a look up table that is around 1gb in size in s3 and just
 had few million records and 5 columns.

 The ETL was running fine , but with no change from the codebase nor the
 infrastructure, we are getting broadcast failures. Even weird fact is the
 older size of the data is 1.4gb while for the new run is just 900 MB

 Below is the error message
 Cannot broadcast table that is larger than 8 GB : 8GB.

 I find it extremely weird considering that the data size is very well
 under the thresholds.

 Are there any other ways to find what could be the issue and how we can
 rectify this issue?

 Could the data characteristics be an issue?

 Any help would be immensely appreciated.

 Thanks

>>>


Re: [Spark Connect] connection issue

2024-07-29 Thread Prabodh Agarwal
Glad it worked!

On Tue, 30 Jul, 2024, 11:12 Ilango,  wrote:

>
> Thanks Prabodh. I copied the spark connect jar to  $SPARK_HOME/jars
> folder.  And passed the location as —jars attr. Its working now. I could
> submit spark jobs via spark connect.
>
> Really appreciate the help.
>
>
>
> Thanks,
> Elango
>
>
> On Tue, 30 Jul 2024 at 11:05 AM, Prabodh Agarwal 
> wrote:
>
>> Yeah. I understand the problem. One of the ways is to actually place the
>> spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark
>> connect. Using the `--packages` or the `--jars` option is flaky in case of
>> spark connect.
>>
>> You can instead manually place the relevant spark connect jar file in the
>> `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars`
>> option from your start command.
>>
>> On Mon, Jul 29, 2024 at 7:01 PM Ilango  wrote:
>>
>>>
>>> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
>>> path. It seems like the spark connect dependency issue. My spark node is
>>> air gapped node so no internet is allowed. Can I download the spark connect
>>> jar and pom files locally and share the local paths? How can I share the
>>> local jars ?
>>>
>>> Error message:
>>>
>>> :: problems summary ::
>>>
>>>  WARNINGS
>>>
>>> module not found:
>>> org.apache.spark#spark-connect_2.12;3.5.1
>>>
>>>
>>>
>>>  local-m2-cache: tried
>>>
>>>
>>>
>>>
>>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>>
>>>
>>>
>>>  local-ivy-cache: tried
>>>
>>>
>>>
>>>
>>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar
>>>
>>>
>>>
>>>  central: tried
>>>
>>>
>>>
>>>
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>>
>>>
>>>
>>>  spark-packages: tried
>>>
>>>
>>>
>>>
>>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>
>>>
>>>
>>>   -- artifact
>>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>>
>>>
>>>
>>>
>>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>>
>>>
>>>
>>> ::
>>>
>>>
>>>
>>> ::  UNRESOLVED DEPENDENCIES ::
>>>
>>>
>>>
>>> ::
>>>
>>>
>>>
>>> :: org.apache.spark#spark-connect_2.12;3.5.1: not found
>>>
>>>
>>>
>>> ::
>>>
>>>
>>>
>>>
>>>
>>>  ERRORS
>>>
>>> Server access error at url
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>  (java.net.ConnectException:
>>> Connection timed out (Connection timed out))
>>>
>>>
>>>
>>> Server access error at url
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
>>> Connection timed out (Connection timed out))
>>>
>>>
>>>
>>> Server access error at url
>>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>>  (java.net.ConnectException:
>>> Connection timed out (Connection timed out))
>>>
>>>
>>>
>>> Server access error at url
>>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
>>> Connection timed out (Connection timed out))
>>>
>>>
>>>
>>>
>>>
>>> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
>>>
>>> Exception in thread "main" java.lang.RuntimeException: [unresolved
>>> dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found]
>>>
>>> at
>>> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608)
>>>
>>> at
>>> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
>>>
>>> at
>>> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.org
>>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
>>>
>>>

Re: [Spark Connect] connection issue

2024-07-29 Thread Ilango
Thanks Prabodh. I copied the spark connect jar to  $SPARK_HOME/jars
folder.  And passed the location as —jars attr. Its working now. I could
submit spark jobs via spark connect.

Really appreciate the help.



Thanks,
Elango


On Tue, 30 Jul 2024 at 11:05 AM, Prabodh Agarwal 
wrote:

> Yeah. I understand the problem. One of the ways is to actually place the
> spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark
> connect. Using the `--packages` or the `--jars` option is flaky in case of
> spark connect.
>
> You can instead manually place the relevant spark connect jar file in the
> `$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars`
> option from your start command.
>
> On Mon, Jul 29, 2024 at 7:01 PM Ilango  wrote:
>
>>
>> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
>> path. It seems like the spark connect dependency issue. My spark node is
>> air gapped node so no internet is allowed. Can I download the spark connect
>> jar and pom files locally and share the local paths? How can I share the
>> local jars ?
>>
>> Error message:
>>
>> :: problems summary ::
>>
>>  WARNINGS
>>
>> module not found:
>> org.apache.spark#spark-connect_2.12;3.5.1
>>
>>
>>
>>  local-m2-cache: tried
>>
>>
>>
>>
>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>
>>
>>
>>  local-ivy-cache: tried
>>
>>
>>
>>
>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar
>>
>>
>>
>>  central: tried
>>
>>
>>
>>
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>
>>
>>
>>  spark-packages: tried
>>
>>
>>
>>
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>
>>
>>
>>   -- artifact
>> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>>
>>
>>
>>
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>>
>>
>>
>> ::
>>
>>
>>
>> ::  UNRESOLVED DEPENDENCIES ::
>>
>>
>>
>> ::
>>
>>
>>
>> :: org.apache.spark#spark-connect_2.12;3.5.1: not found
>>
>>
>>
>> ::
>>
>>
>>
>>
>>
>>  ERRORS
>>
>> Server access error at url
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>  (java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>> Server access error at url
>> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>> Server access error at url
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>>  (java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>> Server access error at url
>> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
>> Connection timed out (Connection timed out))
>>
>>
>>
>>
>>
>> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
>>
>> Exception in thread "main" java.lang.RuntimeException: [unresolved
>> dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found]
>>
>> at
>> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608)
>>
>> at
>> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)
>>
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
>>
>>  

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread Meena Rajani
You probably have to increase jvm/jdk memory size

https://stackoverflow.com/questions/1565388/increase-heap-size-in-java


On Mon, Jul 29, 2024 at 9:36 PM mike Jadoo  wrote:

> Thanks.   I just downloaded the corretto  but I got this error message,
> which was the same as before. [It was shared with me that this saying that
> I have limited resources, i think]
>
> ---Py4JJavaError
>  Traceback (most recent call last)
> Cell In[3], line 13  8 squared_rdd = rdd.map(lambda x: x * x) 10 # 
> Persist the DataFrame in memory 11 
> #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results 
> into a list---> 13 result = squared_rdd.collect() 15 # Print the result   
>   16 print(result)
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, 
> in RDD.collect(self)   1831 with SCCallSiteSync(self.context):   1832 
> assert self.ctx._jvm is not None-> 1833 sock_info = 
> self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())   1834 return 
> list(_load_from_socket(sock_info, self._jrdd_deserializer))
>
> File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in 
> JavaMember.__call__(self, *args)   1316 command = proto.CALL_COMMAND_NAME +\  
>  1317 self.command_header +\   1318 args_command +\   1319 
> proto.END_COMMAND_PART   1321 answer = 
> self.gateway_client.send_command(command)-> 1322 return_value = 
> get_return_value(   1323 answer, self.gateway_client, self.target_id, 
> self.name)   1325 for temp_arg in temp_args:   1326 if hasattr(temp_arg, 
> "_detach"):
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179,
>  in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, 
> **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180   
>   except Py4JJavaError as e:181 converted = 
> convert_exception(e.java_exception)
>
> File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in 
> get_return_value(answer, gateway_client, target_id, name)324 value = 
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == 
> REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error 
> occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", 
> name), value)329 else:330 raise Py4JError(331 "An 
> error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 
> format(target_id, ".", name, value))
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 
> in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 
> (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: 
> Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": 
> CreateProcess error=5, Access is denied
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
>   at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
>   at 
> org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
>   at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
>   at org.apache.spark.scheduler.Task.run(Task.scala:141)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.io.IOException: CreateProcess error=5, Access is denied
>   at java.base/java.lang.ProcessImpl.create(Native Method)
>   at java.base/java.lang.ProcessImpl.(ProcessImpl.java:492)
>   at java.base/java.l

Re: [Spark Connect] connection issue

2024-07-29 Thread Prabodh Agarwal
Yeah. I understand the problem. One of the ways is to actually place the
spark connect jar in the $SPARK_HOME/jars folder. That is how we run spark
connect. Using the `--packages` or the `--jars` option is flaky in case of
spark connect.

You can instead manually place the relevant spark connect jar file in the
`$SPARK_HOME/jars` directory and remove the `--packages` or the `--jars`
option from your start command.

On Mon, Jul 29, 2024 at 7:01 PM Ilango  wrote:

>
> Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
> path. It seems like the spark connect dependency issue. My spark node is
> air gapped node so no internet is allowed. Can I download the spark connect
> jar and pom files locally and share the local paths? How can I share the
> local jars ?
>
> Error message:
>
> :: problems summary ::
>
>  WARNINGS
>
> module not found: org.apache.spark#spark-connect_2.12;3.5.1
>
>
>
>  local-m2-cache: tried
>
>
>
>
> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>
>
>
>  local-ivy-cache: tried
>
>
>
>
> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> /root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar
>
>
>
>  central: tried
>
>
>
>
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>
>
>
>  spark-packages: tried
>
>
>
>
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>
>
>
>   -- artifact
> org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:
>
>
>
>
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar
>
>
>
> ::
>
>
>
> ::  UNRESOLVED DEPENDENCIES ::
>
>
>
> ::
>
>
>
> :: org.apache.spark#spark-connect_2.12;3.5.1: not found
>
>
>
> ::
>
>
>
>
>
>  ERRORS
>
> Server access error at url
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>  (java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
> Server access error at url
> https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
> Server access error at url
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
>  (java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
> Server access error at url
> https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
> Connection timed out (Connection timed out))
>
>
>
>
>
> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
>
> Exception in thread "main" java.lang.RuntimeException: [unresolved
> dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found]
>
> at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608)
>
> at
> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
>
> at
> org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)
>
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
>
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
>
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
>
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
>
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
>
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
>
>
> Thanks,
> Elango
>
>
> On Mon, 29 Jul 2024 at 6:45 PM, Prabodh Agarwal 
> wrote:
>
>> The spark connect startup prints the log location. Is that not feasible
>> for you?
>> For me log comes to $SPARK_HOME/logs
>>
>> On M

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread Sadha Chilukoori
Hi Mike,

This appears to be an access issue on Windows + Python. Can you try setting
up the PYTHON_PATH environment variable as described in this stackoverflow
post
https://stackoverflow.com/questions/60414394/createprocess-error-5-access-is-denied-pyspark

- Sadha

On Mon, Jul 29, 2024 at 3:39 PM mike Jadoo  wrote:

> Thanks.   I just downloaded the corretto  but I got this error message,
> which was the same as before. [It was shared with me that this saying that
> I have limited resources, i think]
>
> ---Py4JJavaError
>  Traceback (most recent call last)
> Cell In[3], line 13  8 squared_rdd = rdd.map(lambda x: x * x) 10 # 
> Persist the DataFrame in memory 11 
> #squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the results 
> into a list---> 13 result = squared_rdd.collect() 15 # Print the result   
>   16 print(result)
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833, 
> in RDD.collect(self)   1831 with SCCallSiteSync(self.context):   1832 
> assert self.ctx._jvm is not None-> 1833 sock_info = 
> self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())   1834 return 
> list(_load_from_socket(sock_info, self._jrdd_deserializer))
>
> File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in 
> JavaMember.__call__(self, *args)   1316 command = proto.CALL_COMMAND_NAME +\  
>  1317 self.command_header +\   1318 args_command +\   1319 
> proto.END_COMMAND_PART   1321 answer = 
> self.gateway_client.send_command(command)-> 1322 return_value = 
> get_return_value(   1323 answer, self.gateway_client, self.target_id, 
> self.name)   1325 for temp_arg in temp_args:   1326 if hasattr(temp_arg, 
> "_detach"):
>
> File 
> C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179,
>  in capture_sql_exception..deco(*a, **kw)177 def deco(*a: Any, 
> **kw: Any) -> Any:178 try:--> 179 return f(*a, **kw)180   
>   except Py4JJavaError as e:181 converted = 
> convert_exception(e.java_exception)
>
> File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in 
> get_return_value(answer, gateway_client, target_id, name)324 value = 
> OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == 
> REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error 
> occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", 
> name), value)329 else:330 raise Py4JError(331 "An 
> error occurred while calling {0}{1}{2}. Trace:\n{3}\n".332 
> format(target_id, ".", name, value))
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 
> in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 
> (TID 7) (mjadoo.myfiosgateway.com executor driver): java.io.IOException: 
> Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": 
> CreateProcess error=5, Access is denied
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
>   at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
>   at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
>   at 
> org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
>   at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
>   at org.apache.spark.scheduler.Task.run(Task.scala:141)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
>   at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.io.IOException: CreateProcess error=5, Access is denied
>   at jav

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread mike Jadoo
Thanks.   I just downloaded the corretto  but I got this error message,
which was the same as before. [It was shared with me that this saying that
I have limited resources, i think]

---Py4JJavaError
Traceback (most recent call last)
Cell In[3], line 13  8 squared_rdd = rdd.map(lambda x: x * x)
10 # Persist the DataFrame in memory 11
#squared_rdd.persist(StorageLevel.MEMORY_ONLY) 12 # Collect the
results into a list---> 13 result = squared_rdd.collect() 15 #
Print the result 16 print(result)

File 
C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\rdd.py:1833,
in RDD.collect(self)   1831 with SCCallSiteSync(self.context):   1832
   assert self.ctx._jvm is not None-> 1833 sock_info =
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())   1834
return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File ~\anaconda3\Lib\site-packages\py4j\java_gateway.py:1322, in
JavaMember.__call__(self, *args)   1316 command =
proto.CALL_COMMAND_NAME +\   1317 self.command_header +\   1318
 args_command +\   1319 proto.END_COMMAND_PART   1321 answer =
self.gateway_client.send_command(command)-> 1322 return_value =
get_return_value(   1323 answer, self.gateway_client,
self.target_id, self.name)   1325 for temp_arg in temp_args:   1326
 if hasattr(temp_arg, "_detach"):

File 
C:\spark\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py:179,
in capture_sql_exception..deco(*a, **kw)177 def deco(*a:
Any, **kw: Any) -> Any:178 try:--> 179 return f(*a,
**kw)180 except Py4JJavaError as e:181 converted =
convert_exception(e.java_exception)

File ~\anaconda3\Lib\site-packages\py4j\protocol.py:326, in
get_return_value(answer, gateway_client, target_id, name)324 value
= OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if
answer[1] == REFERENCE_TYPE:--> 326 raise Py4JJavaError(327
 "An error occurred while calling {0}{1}{2}.\n".328
format(target_id, ".", name), value)329 else:330 raise
Py4JError(331 "An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0
in stage 0.0 (TID 7) (mjadoo.myfiosgateway.com executor driver):
java.io.IOException: Cannot run program
"C:\Users\mikej\AppData\Local\Programs\Python\Python312":
CreateProcess error=5, Access is denied
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at 
org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: CreateProcess error=5, Access is denied
at java.base/java.lang.ProcessImpl.create(Native Method)
at java.base/java.lang.ProcessImpl.(ProcessImpl.java:492)
at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:153)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
... 19 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGS

Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread Sadha Chilukoori
Hi Mike,

I'm not sure about the minimum requirements of a machine for running Spark.
But to run some Pyspark scripts (and Jupiter notbebooks) on a local
machine, I found the following steps are the easiest.


I installed Amazon corretto and updated the java_home variable as
instructed here
https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html
(Any other java works too, I'm used to corretto from work).

Then installed the Pyspark module using pip, which enabled me run Pyspark
on my machine.

-Sadha

On Mon, Jul 29, 2024, 12:51 PM mike Jadoo  wrote:

> Hello,
>
> I am trying to run Pyspark on my computer without success.  I follow
> several different directions from online sources and it appears that I need
> to get a faster computer.
>
> I wanted to ask what are some recommendations for computer specifications
> to run PySpark (Apache Spark).
>
> Any help would be greatly appreciated.
>
> Thank you,
>
> Mike
>


Re: [Spark Connect] connection issue

2024-07-29 Thread Ilango
Thanks Prabodh, Yes I can see the spark connect logs in $SPARK_HOME/logs
path. It seems like the spark connect dependency issue. My spark node is
air gapped node so no internet is allowed. Can I download the spark connect
jar and pom files locally and share the local paths? How can I share the
local jars ?

Error message:

:: problems summary ::

 WARNINGS

module not found: org.apache.spark#spark-connect_2.12;3.5.1



 local-m2-cache: tried




file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




file:/root/.m2/repository/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar



 local-ivy-cache: tried




/root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/ivys/ivy.xml



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




/root/.ivy2/local/org.apache.spark/spark-connect_2.12/3.5.1/jars/spark-connect_2.12.jar



 central: tried




https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar



 spark-packages: tried




https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom



  -- artifact
org.apache.spark#spark-connect_2.12;3.5.1!spark-connect_2.12.jar:




https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar



::



::  UNRESOLVED DEPENDENCIES ::



::



:: org.apache.spark#spark-connect_2.12;3.5.1: not found



::





 ERRORS

Server access error at url
https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
(java.net.ConnectException:
Connection timed out (Connection timed out))



Server access error at url
https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
Connection timed out (Connection timed out))



Server access error at url
https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.pom
(java.net.ConnectException:
Connection timed out (Connection timed out))



Server access error at url
https://repos.spark-packages.org/org/apache/spark/spark-connect_2.12/3.5.1/spark-connect_2.12-3.5.1.jar(java.net.ConnectException:
Connection timed out (Connection timed out))





:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS

Exception in thread "main" java.lang.RuntimeException: [unresolved
dependency: org.apache.spark#spark-connect_2.12;3.5.1: not found]

at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1608)

at
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)

at
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:334)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)

at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)

at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)





Thanks,
Elango


On Mon, 29 Jul 2024 at 6:45 PM, Prabodh Agarwal 
wrote:

> The spark connect startup prints the log location. Is that not feasible
> for you?
> For me log comes to $SPARK_HOME/logs
>
> On Mon, 29 Jul, 2024, 15:30 Ilango,  wrote:
>
>>
>> Hi all,
>>
>>
>> I am facing issues with a Spark Connect application running on a Spark
>> standalone cluster (without YARN and HDFS). After executing the
>> start-connect-server.sh script with the specified packages, I observe a
>> process ID for a short period but am unable to see the corresponding port
>> (default 15002) associated with that PID. The process automatically stops
>> after around 10 minutes.
>>
>> Since the Spark History server is not enabled, I am unable to locate the
>> relevant logs or error messages. The logs for currently running Spark
>> applications are accessible from the Spark UI, but I am unsure where to
>> find the logs for the Spark Connect application an

Re: [Spark Connect] connection issue

2024-07-29 Thread Prabodh Agarwal
The spark connect startup prints the log location. Is that not feasible for
you?
For me log comes to $SPARK_HOME/logs

On Mon, 29 Jul, 2024, 15:30 Ilango,  wrote:

>
> Hi all,
>
>
> I am facing issues with a Spark Connect application running on a Spark
> standalone cluster (without YARN and HDFS). After executing the
> start-connect-server.sh script with the specified packages, I observe a
> process ID for a short period but am unable to see the corresponding port
> (default 15002) associated with that PID. The process automatically stops
> after around 10 minutes.
>
> Since the Spark History server is not enabled, I am unable to locate the
> relevant logs or error messages. The logs for currently running Spark
> applications are accessible from the Spark UI, but I am unsure where to
> find the logs for the Spark Connect application and service.
>
> Could you please advise on where to find the logs or error messages
> related to Spark Connect?
>
>
>
>
> Thanks,
> Elango
>


Re: Issue with comparing structs (possible bug)

2024-07-26 Thread Dhruv Singla
The spark version 3.5.1

On Fri, Jul 26, 2024 at 6:54 PM Dhruv Singla  wrote:

> Hey Everyone
>
> Hope you are doing well
>
> I am trying to compare structs with structs using the IN clause. Here is
> what I found.
> The following query comparing structs gives error
>
> SELECT struct(1, 2) IN (
> SELECT struct(c1, c2)
> FROM (VALUES (1, 2), (3, 4)) AS t(c1, c2)
> );
>
> Error - Cannot resolve "(named_struct('1', 1, '2', 2) IN (listquery()))"
> due to data type mismatch: The number of columns in the left hand side of
> an IN subquery does not match the number of columns in the output of
> subquery. *Left hand side columns(length: 2)*: ["1", "2"], *right hand
> side columns(length: 1)*: ["struct(c1, c2)"]. However, if I specify
> named_struct instead of a struct, then it works fine.
>
> SELECT named_struct('1', 1, '2', 2) IN (
> SELECT struct(c1, c2)
> FROM (VALUES (1, 2), (3, 4)) AS t(c1, c2)
> );
>
> +--+
> | (named_struct(1, 1, 2, 2) IN (listquery())) |
> +--+
> |   true |
> +--+ What is
> happening here? Thanks & Regards Dhruv
>


Re: [Issue] Spark SQL - broadcast failure

2024-07-23 Thread Sudharshan V
We removed the explicit broadcast for that particular table and it took
longer time since the join type changed from BHJ to SMJ.

I wanted to understand how I can find what went wrong with the broadcast
now.
How do I know the size of the table inside of spark memory.

I have tried to cache the table hoping I could see the table size in the
storage tab of spark UI of EMR.

But I see no data there .

Thanks

On Tue, 23 Jul, 2024, 12:48 pm Sudharshan V, 
wrote:

> Hi all, apologies for the delayed response.
>
> We are using spark version 3.4.1 in jar and EMR 6.11 runtime.
>
> We have disabled the auto broadcast always and would broadcast the smaller
> tables using explicit broadcast.
>
> It was working fine historically and only now it is failing.
>
> The data sizes I mentioned was taken from S3.
>
> Thanks,
> Sudharshan
>
> On Wed, 17 Jul, 2024, 1:53 am Meena Rajani, 
> wrote:
>
>> Can you try disabling broadcast join and see what happens?
>>
>> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
>> wrote:
>>
>>> Hi all,
>>>
>>> Been facing a weird issue lately.
>>> In our production code base , we have an explicit broadcast for a small
>>> table.
>>> It is just a look up table that is around 1gb in size in s3 and just had
>>> few million records and 5 columns.
>>>
>>> The ETL was running fine , but with no change from the codebase nor the
>>> infrastructure, we are getting broadcast failures. Even weird fact is the
>>> older size of the data is 1.4gb while for the new run is just 900 MB
>>>
>>> Below is the error message
>>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>>
>>> I find it extremely weird considering that the data size is very well
>>> under the thresholds.
>>>
>>> Are there any other ways to find what could be the issue and how we can
>>> rectify this issue?
>>>
>>> Could the data characteristics be an issue?
>>>
>>> Any help would be immensely appreciated.
>>>
>>> Thanks
>>>
>>


Re: [Issue] Spark SQL - broadcast failure

2024-07-23 Thread Sudharshan V
Hi all, apologies for the delayed response.

We are using spark version 3.4.1 in jar and EMR 6.11 runtime.

We have disabled the auto broadcast always and would broadcast the smaller
tables using explicit broadcast.

It was working fine historically and only now it is failing.

The data sizes I mentioned was taken from S3.

Thanks,
Sudharshan

On Wed, 17 Jul, 2024, 1:53 am Meena Rajani,  wrote:

> Can you try disabling broadcast join and see what happens?
>
> On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
> wrote:
>
>> Hi all,
>>
>> Been facing a weird issue lately.
>> In our production code base , we have an explicit broadcast for a small
>> table.
>> It is just a look up table that is around 1gb in size in s3 and just had
>> few million records and 5 columns.
>>
>> The ETL was running fine , but with no change from the codebase nor the
>> infrastructure, we are getting broadcast failures. Even weird fact is the
>> older size of the data is 1.4gb while for the new run is just 900 MB
>>
>> Below is the error message
>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>
>> I find it extremely weird considering that the data size is very well
>> under the thresholds.
>>
>> Are there any other ways to find what could be the issue and how we can
>> rectify this issue?
>>
>> Could the data characteristics be an issue?
>>
>> Any help would be immensely appreciated.
>>
>> Thanks
>>
>


Re: issue forwarding SPARK_CONF_DIR to start workers

2024-07-20 Thread Holden Karau
This might a good discussion for the dev@ list, I don’t know much about
SLURM deployments personally.

Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


On Sat, Jul 20, 2024 at 11:00 AM Patrice Duroux 
wrote:

> Hi,
>
> Here is a small patch that solves this issue.
> Considering all the scripts, I'm not sure if sbin/stop-workers.sh and
> sbin/stop-worker.sh need a similar change.
> Do they really care about SPARK_CONF_DIR to do the job?
>
> Note that I have also removed the following part in the script:
> cd "${SPARK_HOME}" \;
> in the command to workers.sh
> To me, it doesn't seem helpful unless CWD is important, but it shouldn't..
>
> Still in this consideration of minimalism, I think it could also be
> removed from:
>
> sbin/start-workers.sh:46:"${SPARK_HOME}/sbin/workers.sh" cd
> "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh"
> "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
> sbin/stop-workers.sh:28:"${SPARK_HOME}/sbin/workers.sh" cd
> "${SPARK_HOME}" \; "${SPARK_HOME}/sbin"/stop-worker.sh
> sbin/spark-daemons.sh:36:exec "${SPARK_HOME}/sbin/workers.sh" cd
> "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/spark-daemon.sh" "$@"
>
> Regards,
> Patrice
>
> Le jeu. 18 juil. 2024 à 15:34, Patrice Duroux
>  a écrit :
> >
> > Hi,
> >
> > I'm trying to build a SLURM script to start a Spark environment
> > (master+workers) dynamically configured by the job sent to the queue.
> > Issue during the job execution is to start all the workers that are
> > then using an default (empty) configuration.
> > How could I "forward" the SPARK_CONF_DIR at this step?
> > Using SPARK_SSH_OPTS in sbin/workers.sh is of no help because adding
> > -o SendEnv requires
> > authorization in sshd. Is there any way to add option/parameters to
> > the ssh command?
> > Currently, here is the corresponding call in start-workers.sh
> >
> > "${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \;
> > "${SPARK_HOME}/sbin/start-worker.sh"
> > "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
> >
> > Also modifying files like .profile or .bashrc, etc. is risky and not a
> > solution mainly here because each job will have its own conf_dir and
> > multiple jobs could be run in parallel.
> >
> > Many thanks!
> >
> > Regards,
> > Patrice
> >
> > Here is a sample of such a script:
> >
> > #!/usr/bin/sh
> >
> > #SBATCH -N 2
> > #SBATCH --time=00:05:00
> >
> > SPARK_HOME="$WORK"/spark-3.5.1-bin-hadoop3
> >
> > create_spark_conf(){
> > export SPARK_LOCAL_DIRS=$(mktemp -d spark-)
> > export SPARK_CONF_DIR="$SPARK_LOCAL_DIRS"/conf
> > mkdir -p $SPARK_CONF_DIR
> > echo "export SPARK_LOCAL_DIRS=\"$(realpath "$SPARK_LOCAL_DIRS")\"
> > export SPARK_CONF_DIR=\"$(realpath "$SPARK_LOCAL_DIRS")/conf\"
> > export SPARK_LOG_DIR=\"$(realpath "$SPARK_LOCAL_DIRS")/logs\"
> > module load openjdk/11.0.2
> > " > "$SPARK_CONF_DIR"/spark-env.sh
> > scontrol show hostname $SLURM_JOB_NODELIST > "$SPARK_CONF_DIR"/workers
> > }
> >
> > cd "$SCRATCH"
> > create_spark_conf
> > "$SPARK_HOME"/sbin/start-all.sh
> > "$SPARK_HOME"/bin/spark-submit "$HOME"/testspark-0.0.1-SNAPSHOT.jar "$@"
> > "$SPARK_HOME"/sbin/stop-all.sh
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: issue forwarding SPARK_CONF_DIR to start workers

2024-07-20 Thread Patrice Duroux
Hi,

Here is a small patch that solves this issue.
Considering all the scripts, I'm not sure if sbin/stop-workers.sh and
sbin/stop-worker.sh need a similar change.
Do they really care about SPARK_CONF_DIR to do the job?

Note that I have also removed the following part in the script:
cd "${SPARK_HOME}" \;
in the command to workers.sh
To me, it doesn't seem helpful unless CWD is important, but it shouldn't..

Still in this consideration of minimalism, I think it could also be
removed from:

sbin/start-workers.sh:46:"${SPARK_HOME}/sbin/workers.sh" cd
"${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh"
"spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
sbin/stop-workers.sh:28:"${SPARK_HOME}/sbin/workers.sh" cd
"${SPARK_HOME}" \; "${SPARK_HOME}/sbin"/stop-worker.sh
sbin/spark-daemons.sh:36:exec "${SPARK_HOME}/sbin/workers.sh" cd
"${SPARK_HOME}" \; "${SPARK_HOME}/sbin/spark-daemon.sh" "$@"

Regards,
Patrice

Le jeu. 18 juil. 2024 à 15:34, Patrice Duroux
 a écrit :
>
> Hi,
>
> I'm trying to build a SLURM script to start a Spark environment
> (master+workers) dynamically configured by the job sent to the queue.
> Issue during the job execution is to start all the workers that are
> then using an default (empty) configuration.
> How could I "forward" the SPARK_CONF_DIR at this step?
> Using SPARK_SSH_OPTS in sbin/workers.sh is of no help because adding
> -o SendEnv requires
> authorization in sshd. Is there any way to add option/parameters to
> the ssh command?
> Currently, here is the corresponding call in start-workers.sh
>
> "${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \;
> "${SPARK_HOME}/sbin/start-worker.sh"
> "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
>
> Also modifying files like .profile or .bashrc, etc. is risky and not a
> solution mainly here because each job will have its own conf_dir and
> multiple jobs could be run in parallel.
>
> Many thanks!
>
> Regards,
> Patrice
>
> Here is a sample of such a script:
>
> #!/usr/bin/sh
>
> #SBATCH -N 2
> #SBATCH --time=00:05:00
>
> SPARK_HOME="$WORK"/spark-3.5.1-bin-hadoop3
>
> create_spark_conf(){
> export SPARK_LOCAL_DIRS=$(mktemp -d spark-)
> export SPARK_CONF_DIR="$SPARK_LOCAL_DIRS"/conf
> mkdir -p $SPARK_CONF_DIR
> echo "export SPARK_LOCAL_DIRS=\"$(realpath "$SPARK_LOCAL_DIRS")\"
> export SPARK_CONF_DIR=\"$(realpath "$SPARK_LOCAL_DIRS")/conf\"
> export SPARK_LOG_DIR=\"$(realpath "$SPARK_LOCAL_DIRS")/logs\"
> module load openjdk/11.0.2
> " > "$SPARK_CONF_DIR"/spark-env.sh
> scontrol show hostname $SLURM_JOB_NODELIST > "$SPARK_CONF_DIR"/workers
> }
>
> cd "$SCRATCH"
> create_spark_conf
> "$SPARK_HOME"/sbin/start-all.sh
> "$SPARK_HOME"/bin/spark-submit "$HOME"/testspark-0.0.1-SNAPSHOT.jar "$@"
> "$SPARK_HOME"/sbin/stop-all.sh
diff --git a/sbin/start-worker.sh b/sbin/start-worker.sh
index fd58f01..bb808d5 100755
--- a/sbin/start-worker.sh
+++ b/sbin/start-worker.sh
@@ -39,8 +39,8 @@ fi
 # Any changes need to be reflected there.
 CLASS="org.apache.spark.deploy.worker.Worker"
 
-if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-worker.sh  [options]"
+if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ "$@" = *--config ]]; then
+  echo "Usage: ./sbin/start-worker.sh [--config ]  [options]"
   pattern="Usage:"
   pattern+="\|Using Spark's default log4j profile:"
   pattern+="\|Started daemon with process name"
@@ -52,6 +52,22 @@ fi
 
 . "${SPARK_HOME}/sbin/spark-config.sh"
 
+# Check if --config is passed as an argument. It is an optional parameter.
+# Exit if the argument is not a directory.
+if [ "$1" == "--config" ]
+then
+  shift
+  conf_dir="$1"
+  if [ ! -d "$conf_dir" ]
+  then
+echo "ERROR: $conf_dir is not a directory"
+exit 1
+  else
+export SPARK_CONF_DIR="$conf_dir"
+  fi
+  shift
+fi
+
 . "${SPARK_HOME}/bin/load-spark-env.sh"
 
 # First argument should be the master; we need to store it aside because we may
diff --git a/sbin/start-workers.sh b/sbin/start-workers.sh
index 3867ef3..c891545 100755
--- a/sbin/start-workers.sh
+++ b/sbin/start-workers.sh
@@ -43,4 +43,4 @@ if [ "$SPARK_MASTER_HOST" = "" ]; then
 fi
 
 # Launch the workers
-"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
+"${SPARK_HOME}/sbin/workers.sh" "${SPARK_HOME}/sbin/start-worker.sh" --config "$SPARK_CONF_DIR" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [Issue] Spark SQL - broadcast failure

2024-07-16 Thread Meena Rajani
Can you try disabling broadcast join and see what happens?

On Mon, Jul 8, 2024 at 12:03 PM Sudharshan V 
wrote:

> Hi all,
>
> Been facing a weird issue lately.
> In our production code base , we have an explicit broadcast for a small
> table.
> It is just a look up table that is around 1gb in size in s3 and just had
> few million records and 5 columns.
>
> The ETL was running fine , but with no change from the codebase nor the
> infrastructure, we are getting broadcast failures. Even weird fact is the
> older size of the data is 1.4gb while for the new run is just 900 MB
>
> Below is the error message
> Cannot broadcast table that is larger than 8 GB : 8GB.
>
> I find it extremely weird considering that the data size is very well
> under the thresholds.
>
> Are there any other ways to find what could be the issue and how we can
> rectify this issue?
>
> Could the data characteristics be an issue?
>
> Any help would be immensely appreciated.
>
> Thanks
>


Re: [Issue] Spark SQL - broadcast failure

2024-07-16 Thread Mich Talebzadeh
It will help if you mention the Spark version and the piece of problematic
code

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 16 Jul 2024 at 08:51, Sudharshan V 
wrote:

>
> On Mon, 8 Jul, 2024, 7:53 pm Sudharshan V, 
> wrote:
>
>> Hi all,
>>
>> Been facing a weird issue lately.
>> In our production code base , we have an explicit broadcast for a small
>> table.
>> It is just a look up table that is around 1gb in size in s3 and just had
>> few million records and 5 columns.
>>
>> The ETL was running fine , but with no change from the codebase nor the
>> infrastructure, we are getting broadcast failures. Even weird fact is the
>> older size of the data is 1.4gb while for the new run is just 900 MB
>>
>> Below is the error message
>> Cannot broadcast table that is larger than 8 GB : 8GB.
>>
>> I find it extremely weird considering that the data size is very well
>> under the thresholds.
>>
>> Are there any other ways to find what could be the issue and how we can
>> rectify this issue?
>>
>> Could the data characteristics be an issue?
>>
>> Any help would be immensely appreciated.
>>
>> Thanks
>>
>


Re: Help wanted on securing spark with Apache Knox / JWT

2024-07-12 Thread Adam Binford
You need to use the spark.ui.filters setting on the history server
https://spark.apache.org/docs/latest/configuration.html#spark-ui:

spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter
spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.param.type=org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler
spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.param.authentication.provider.url=https://
:8443/gateway/knoxsso/api/v1/websso
...etc

On Thu, Jul 11, 2024 at 4:18 PM Thomas Mauran
 wrote:

> Hello,
> I am sending this email to the mailing list, to get your help on a problem
> that I can't seem to resolve myself.
>
> I am trying to secure Spark history ui running with Yarn as master using
> Apache Knox.
>
> From the Knox configuration point of view I managed to secure the Spark
> service, if I go on https://:8443/gateway/default/spark3history I have to
> login using SSO then I get redirected to spark history server web ui which
> works as expected.
>
> But if I directly access Spark without getting logged in I don't get
> redirected to Knox login page which is what I would like to have, same as
> HDFS and YarnUI.
>
> From what I see in Spark documentation the webui needs to be protected
> using the filter system. I can' t seem to find a filter to protect my Spark
> history UI using Knox, I protected both HDFS and Yarn by adding this in
> core-site.xml which works fine.
>
> 
> hadoop.http.authentication.type
>
> org.apache.hadoop.security.authentication.server.JWTRedirectAuthenticationHandler
> 
> hadoop.http.authentication.authentication.provider.url
> 
> https://:8443/gateway/knoxsso/api/v1/websso
>
>
> 
> hadoop.http.authentication.public.key.pem
> 
>
> Adding those properties allowed me to get redirected to knox host page
> when I didn' t login yet.
>
> I am wondering if you knew how to secure Spark history UI to have the same
> behavior.
>
> Do you know what configuration I am missing to redirect it back to the
> Knox gateway login page from the Spark history UI as for the other services
> where the JWT token is passed and used for keeping the user session ?
>
> I tried to play with the filters especially
> org.apache.hadoop.security.authentication.server.AuthenticationFilter but
> didn' t manage to get anything working, so I don' t even know if this is
> the right way to do.
>
> Thanks for your answer
>
>

-- 
Adam Binford


Re: AttributeError: 'MulticlassMetrics' object has no attribute '_sc'

2024-06-23 Thread Saurabh Kumar
Please Unsubscribe me

On Mon, 24 Jun 2024 at 07:02, Azhuvath, RajeevX 
wrote:

> Getting the error “AttributeError: 'MulticlassMetrics' object has no
> attribute '_sc'” while executing the standalone attached code in a bare
> metal system.
>
>
>
> Thanks and Regards,
>
> Rajeev
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-06-20 Thread Anil Dasari
Hello @Tathagata Das 
Could you share your thoughts on
https://issues.apache.org/jira/browse/SPARK-48418 ? Let me know if you have
any questions. thanks.

Regards,
Anil

On Fri, May 24, 2024 at 12:13 AM Anil Dasari  wrote:

> It appears that structured streaming and Dstream have entirely different
> microbatch metadata representation
> Can someone assist me in finding the following Dstream microbatch metadata
> equivalent in Structured streaming.
>
> 1. microbatch timestamp : structured streaming foreachBatch gives batchID
> which is not a timestamp. Is there a way to get the microbatch timestamp ?
> 2. microbatch start event ?
> 3. scheduling delay of a microbatch ?
> 4. pending microbatches in case of fixed internal microbatches ?
>
> Thanks
>
> On Wed, May 22, 2024 at 5:23 PM Anil Dasari  wrote:
>
>> You are right.
>> - another question on migration. Is there a way to get the microbatch id
>> during the microbatch dataset `trasform` operation like in rdd transform ?
>> I am attempting to implement the following pseudo functionality with
>> structured streaming. In this approach, recordCategoriesMetadata is fetched
>> and rdd metrics like rdd size etc using microbatch idin the transform
>> operation.
>> ```code
>> val rddQueue = new mutable.Queue[RDD[Int]]()
>> // source components
>> val sources = Seq.empty[String]
>> val consolidatedDstream = sources
>> .map(source => {
>> val inputStream = ssc.queueStream(rddQueue)
>> inputStream.transform((rdd, ts) => {
>> // emit metrics of microbatch ts : rdd size etc.
>>
>> val recordCategories = rdd.map(..).collect();
>> val recordCategoriesMetadata = ...
>> rdd
>> .map(r =>
>> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
>> (source, customRecord)
>> )
>> })
>> }
>> )
>> .reduceLeft(_ union _)
>>
>> consolidatedDstream
>> .foreachRDD((rdd, ts) => {
>> // get pipes for each source
>> val pipes = Seq.empty[String] // pipes of given source
>> pipes.foreach(pipe => {
>> val pipeSource = null; // get from pipe variable
>> val psRDD = rdd
>> .filter {
>> case (source, sourceRDD) => source.equals(pipeSource)
>> }
>> // apply pipe transformation and sink
>>
>> })
>> })
>> ```
>>
>> In structured streaming, it can look like -
>>
>> ```code
>> val consolidatedDstream = sources
>> .map(source => {
>> val inputStream = ... (for each source)
>> inputStream
>> }
>> )
>> .reduceLeft(_ union _)
>>
>> consolidatedDstream
>> .writeStream
>> .foreachBatch((ds, ts) => {
>> val newDS = ds.transform((internalDS => {
>> // emit metrics of microbatch ts : rdd size etc.
>>
>> val recordCategories = rdd.map(..).collect();
>> val recordCategoriesMetadata = ...
>> internalDS
>> .map(r =>
>> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
>> (source, customRecord)
>> )
>> })(... )
>> // get pipes for each source
>> val pipes = Seq.empty[String] // pipes of given source
>> pipes.foreach(pipe => {
>> val pipeSource = null; // get from pipe variable
>> val psRDD = newDS
>> .filter {
>> case (source, sourceDS) => source.equals(pipeSource)
>> }
>> // apply pipe transformation and sink
>>
>> })
>> })
>> ```
>> ^ is just pseudo code and still not sure if it works. Let me know your
>> suggestions if any. thanks.
>>
>> On Wed, May 22, 2024 at 8:34 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> The right way to associated microbatches when committing to external
>>> storage is to use the microbatch id that you can get in foreachBatch. That
>>> microbatch id guarantees that the data produced in the batch is the always
>>> the same no matter any recomputations (assuming all processing logic is
>>> deterministic). So you can commit the batch id + batch data together. And
>>> then async commit the batch id + offsets.
>>>
>>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari 
>>> wrote:
>>>
 Thanks Das, Mtich.

 Mitch,
 We process data from Kafka and write it to S3 in Parquet format using
 Dstreams. To ensure exactly-once delivery and prevent data loss, our
 process records micro-batch offsets to an external storage at the end of
 each micro-batch in foreachRDD, which is then used when the job restarts.

 Das,
 Thanks for sharing the details. I will look into them.
 Unfortunately, the listeners process is async and can't
 guarantee happens before association with microbatch to commit offsets to
 external storage. But still they will work. Is there a way to access
 lastProgress in foreachBatch ?


 On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> If you want to find what offset ranges are present in a microbatch in
> Structured Streaming, you have to look at the
> StreamingQuery.lastProgress or use the QueryProgressListener
> .
> Both of these approaches gives you access to the SourceProgress
> 

Re: Spark Decommission

2024-06-20 Thread Rajesh Mahindra
Thank Ahmed, thats useful information

On Wed, Jun 19, 2024 at 1:36 AM Khaldi, Ahmed 
wrote:

> Hey Rajesh,
>
>
>
> Fromm y experience, it’s a stable feature, however you must keep in mind
> that it will not guarantee that you will not lose the data that is on the
> pods of the nodes getting a spot kill. Once you have a spot a kill, you
> have 120s to give the node back to the cloud provider. This is when the
> decommission script will start and sometimes 120s is enough to migrate the
> shuffle/rdd blocks, and sometimes it’s not. It really depends on your
> workload and data at the end.
>
>
>
>
>
> *Best regards,*
>
>
>
> *Ahmed Khaldi*
>
> Solutions Architect
>
>
>
> *NetApp Limited.*
>
> +33617424566 Mobile Phone
>
> kah...@netapp.com 
>
>
>
>
>
>
>
> *From: *Rajesh Mahindra 
> *Date: *Tuesday, 18 June 2024 at 23:54
> *To: *user@spark.apache.org 
> *Subject: *Spark Decommission
>
> Vous ne recevez pas souvent de courriers de la part de rjshmh...@gmail.com.
> Découvrez pourquoi cela est important
> 
>
>
>
> *EXTERNAL EMAIL - USE CAUTION when clicking links or attachments *
>
>
>
> Hi folks,
>
>
>
> I am planning to leverage the "Spark Decommission" feature in production
> since our company uses SPOT instances on Kubernetes. I wanted to get a
> sense of how stable the feature is for production usage and if any one has
> thoughts around trying it out in production, especially in kubernetes
> environment.
>
>
>
> Thanks,
>
> Rajesh
>


Re: Help in understanding Exchange in Spark UI

2024-06-20 Thread Mich Talebzadeh
OK, I gave an answer in StackOverflow. Happy reading

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 20 Jun 2024 at 17:15, Dhruv Singla  wrote:

> Hey Team
>
> I've posted a question of StackOverflow. The link is -
> https://stackoverflow.com/questions/78644118/understanding-exchange-in-spark-ui
>
> I haven't got any responses yet. If possible could you please look into
> it? If you need me to write the question in the mailing list, I can do that
> as well.
>
> Thanks & Regards
> Dhruv
>


Re: [SPARK-48423] Unable to save ML Pipeline to azure blob storage

2024-06-19 Thread Chhavi Bansal
Hello Team,
I am pinging back on this thread to get a pair of eyes on this issue.
Ticket:  https://issues.apache.org/jira/browse/SPARK-48423

On Thu, 6 Jun 2024 at 00:19, Chhavi Bansal  wrote:

> Hello team,
> I was exploring on how to save ML pipeline to azure blob storage, but was
> setback by an issue where it complains of  `fs.azure.account.key`  not
> being found in the configuration even when I have provided the values in
> the pipelineModel.option(key1,value1) field. I considered raising a
> ticket on spark https://issues.apache.org/jira/browse/SPARK-48423, where
> I describe the entire scenario. I tried debugging the code and found that
> this key is being explicitly asked for in the code. The only solution was
> to again set it part of spark.conf which could result to a race condition
> since we work on multi-tenant architecture.
>
>
>
> Since saving to Azure blob storage would be common, Can someone please
> guide me if I am missing something in the `.option` clause?
>
>
>
> I would be happy to make a contribution to the code if someone can shed
> some light on how this could be solved.
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


-- 
Thanks and Regards,
Chhavi Bansal


Re: Spark Decommission

2024-06-19 Thread Khaldi, Ahmed
Hey Rajesh,

Fromm y experience, it’s a stable feature, however you must keep in mind that 
it will not guarantee that you will not lose the data that is on the pods of 
the nodes getting a spot kill. Once you have a spot a kill, you have 120s to 
give the node back to the cloud provider. This is when the decommission script 
will start and sometimes 120s is enough to migrate the shuffle/rdd blocks, and 
sometimes it’s not. It really depends on your workload and data at the end.


Best regards,

Ahmed Khaldi
Solutions Architect

NetApp Limited.
+33617424566 Mobile Phone
kah...@netapp.com



From: Rajesh Mahindra 
Date: Tuesday, 18 June 2024 at 23:54
To: user@spark.apache.org 
Subject: Spark Decommission
Vous ne recevez pas souvent de courriers de la part de rjshmh...@gmail.com. 
Découvrez pourquoi cela est 
important

EXTERNAL EMAIL - USE CAUTION when clicking links or attachments


Hi folks,

I am planning to leverage the "Spark Decommission" feature in production since 
our company uses SPOT instances on Kubernetes. I wanted to get a sense of how 
stable the feature is for production usage and if any one has thoughts around 
trying it out in production, especially in kubernetes environment.

Thanks,
Rajesh


Re: Update mode in spark structured streaming

2024-06-15 Thread Mich Talebzadeh
Best to qualify your thoughts with an example

By using the foreachBatch function combined with the update output mode in
Spark Structured Streaming, you can effectively handle and integrate
late-arriving data into your aggregations. This approach will allow you to
continuously update your aggregated results with both on-time and late data

example

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, window, sum as spark_sum, max
as spark_max, current_timestamp

# Create Spark session
spark = SparkSession.builder.appName("exampleWithRate").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Simulate a stream of data with an event time
stream = spark.readStream.format("rate").option("rowsPerSecond", 5).load()
base_timestamp = current_timestamp()
stream = stream.withColumn("event_time", (base_timestamp + (col("value") *
60).cast("interval second")).cast("timestamp"))
stream = stream.withColumn("value", col("value") % 10)

def process_batch(batch_df, batch_id):
# Read current state from an external store (simulated here as a static
DataFrame)
current_state = spark.createDataFrame(
[(1, 10, '2024-06-13 10:00:00')],
["key", "total_value", "max_event_time"]
).withColumn("max_event_time", col("max_event_time").cast("timestamp"))

# Perform aggregation including late data handling
aggregated_batch = batch_df.groupBy("value").agg(
spark_sum("value").alias("total_value"),
spark_max("event_time").alias("max_event_time")
)

# Merge with current state
merged_state = current_state.union(aggregated_batch)

# Show the merged state
merged_state.show(truncate=False)

# Define your streaming query
streaming_query = (
stream
.withWatermark("event_time", "10 minutes")
.writeStream
.foreachBatch(process_batch)
.outputMode("update")
.start()
)
# Await termination
streaming_query.awaitTermination()


and the output

+---+---+---+
|key|total_value|max_event_time |
+---+---+---+
|1  |10 |2024-06-13 10:00:00|
+---+---+---+

+---+---+---+
|key|total_value|max_event_time |
+---+---+---+
|1  |10 |2024-06-13 10:00:00|
|0  |0  |2024-06-15 16:22:23.642|
|8  |8  |2024-06-15 16:20:23.642|
|2  |4  |2024-06-15 16:24:23.642|
|4  |8  |2024-06-15 16:26:23.642|
|9  |9  |2024-06-15 16:21:23.642|
|5  |5  |2024-06-15 16:17:23.642|
|1  |2  |2024-06-15 16:23:23.642|
|3  |6  |2024-06-15 16:25:23.642|
|6  |6  |2024-06-15 16:18:23.642|
|7  |7  |2024-06-15 16:19:23.642|
+---+---+---+

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime


Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 14 Jun 2024 at 20:13, Om Prakash  wrote:

> Hi Team,
>
> Hope you all are doing well. I have run into a use case in which I want to
> do the aggregation in foreachbatch and use update mode for handling late
> data in structured streaming. Will this approach work in effectively
> capturing late arriving data in the aggregations?  Please help.
>
>
>
> Thanking you,
> Om Prakash
>


Re: Re: OOM issue in Spark Driver

2024-06-11 Thread Mich Talebzadeh
In a nutshell, the culprit for the OOM  issue in your Spark driver appears
to be memory leakage or inefficient memory usage within your application.
This could be caused by factors such as:

   1. Accumulation of data or objects in memory over time without proper
   cleanup.
   2. Inefficient data processing or transformations leading to excessive
   memory usage.
   3. Long-running tasks or stages that accumulate memory usage.
   4. Suboptimal Spark configuration settings, such as insufficient memory
   allocation for the driver or executors.
   5. Check stages and executor tabs in Spark GUI

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

PhD Imperial College London
London, United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 11 Jun 2024 at 06:50, Lingzhe Sun  wrote:

> Hi Kathick,
>
> That suggests you're not performing stateful operations and therefore
> there're no state related metrics. You should consider other aspects that
> may cause OOM.
> Checking logs will always be a good start. And it would be better if some
> colleague of you is familiar with JVM and OOM related issues.
>
> BS
> Lingzhe Sun
>
>
> *From:* Karthick Nk 
> *Date:* 2024-06-11 13:28
> *To:* Lingzhe Sun 
> *CC:* Andrzej Zera ; User 
> *Subject:* Re: Re: OOM issue in Spark Driver
> Hi Lingzhe,
>
> I am able to get the below stats(i.e input rate, process rate, input rows
> etc..), but not able to find the exact stats that Andrzej asking (ie. 
> Aggregated
> Number Of Total State Rows), Could you guide me on how do I get those
> details for states under structured streaming.
> [image: image.png]
>
> Details:
> I am using Databricks runtime version: 13.3 LTS (includes Apache Spark
> 3.4.1, Scala 2.12)
> Driver and worker type:
> [image: image.png]
>
>
> Thanks
>
>
> On Tue, Jun 11, 2024 at 7:34 AM Lingzhe Sun 
> wrote:
>
>> Hi Kathick,
>>
>> I believed that what Andrzej means is that you should check
>> Aggregated Number Of Total State Rows
>> metirc which you could find in the structured streaming UI tab, which
>> indicate the total number of your states, only if you perform stateful
>> operations. If that increase indefinitely, you should probably check your
>> code logic.
>>
>> BS
>> Lingzhe Sun
>>
>>
>> *From:* Karthick Nk 
>> *Date:* 2024-06-09 14:45
>> *To:* Andrzej Zera 
>> *CC:* user 
>> *Subject:* Re: OOM issue in Spark Driver
>> Hi Andrzej,
>>
>> We are using both driver and workers too,
>> Details are as follows
>> Driver size:128GB Memory, 64 cores.
>> Executors size: 64GB Memory, 32 Cores (Executors 1 to 10 - Autoscale)
>>
>> Workers memory usage:
>> One of the worker memory usage screenshot:
>> [image: image.png]
>>
>>
>> State metrics details below:
>> [image: image.png]
>> [image: image.png]
>>
>> I am not getting memory-related info from the structure streaming tab,
>> Could you help me here?
>>
>> Please let me know if you need more details.
>>
>> If possible we can connect once at your time and look into the issue
>> which will be more helpful to me.
>>
>> Thanks
>>
>> On Sat, Jun 8, 2024 at 2:41 PM Andrzej Zera 
>> wrote:
>>
>>> Hey, do you perform stateful operations? Maybe your state is growing
>>> indefinitely - a screenshot with state metrics would help (you can find it
>>> in Spark UI -> Structured Streaming -> your query). Do you have a
>>> driver-only cluster or do you have workers too? What's the memory usage
>>> profile at workers?
>>>

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-10 Thread Gourav Sengupta
Hi,

another thing we can consider while parallelising connection with the
upstream sources is that it means you are querying the system
simultaneously and that causes usage spikes, and in case  the source system
is facing a lot of requests during production workloads the best time to
parallelise workloads from upstream systems is during their off peak hours.

It also kind of makes sense in a way that you are replicating data when
there are less changes in the upstream system, but sometimes businesses may
need very low latency replication to happen, in such cases using reader
instances (in case you are using AWS RDS), or reading database redo log
files, or their replication to brokers helps.

Each individual partitions are processed independently, in case you have
upsert operations running on the target based on which multiple upsert/
merge operations can  occur the best strategy would be first to
replicate the table in a staging area and then do the upsert/ merge
operation to the target.


Regards,
Gourav Sengupta

On Fri, Jun 7, 2024 at 1:01 AM Perez  wrote:

> Also can I take my lower bound starting from 1 or is it index?
>
> On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:
>
>> Thanks again Mich. It gives the clear picture but I have again couple of
>> doubts:
>>
>> 1) I know that there will be multiple threads that will be executed with
>> 10 segment sizes each until the upper bound is reached but I didn't get
>> this part of the code exactly segments = [(i, min(i + segment_size,
>> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>>
>> 2) Also performing union on these small dataframes won't impact
>> performance right? since spark has to shuffle and combine less data from
>> these dataframes?
>>
>>
>> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
>> wrote:
>>
>>> well you can dynamically determine the upper bound by first querying the
>>> database to find the maximum value of the partition column and use it as
>>> the upper bound for your partitioning logic.
>>>
>>> def get_max_value(spark, mongo_config, column_name):
>>> max_value_df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>> return max_value
>>>
>>> # Define your MongoDB config without the bounds first
>>> mongo_config_base = {
>>> "uri": "mongodb://username:password@host:port/database.collection",
>>> "partitionColumn": "_id"
>>> }
>>>
>>> # Fetch the dynamic upper bound
>>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>>
>>> # Define your segment size
>>> segment_size = 10
>>> lower_bound = 0
>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>> range(lower_bound, upper_bound, segment_size)]
>>>
>>> Then you need to aggregate DF from multiple threads When loading data in
>>> parallel, each thread will load a segment of data into its own DataFrame.
>>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>>> union method in PySpark.*
>>>
>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>> from pyspark.sql import SparkSession
>>>
>>> def extract_data_from_mongodb(mongo_config):
>>> df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>> return df
>>>
>>> # Function to get the maximum value of the partition column
>>> def get_max_value(spark, mongo_config, column_name):
>>> max_value_df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>> return max_value
>>>
>>> # MongoDB configuration without bounds
>>> mongo_config_base = {
>>> "uri": "mongodb://username:password@host:port/database.collection",
>>> "partitionColumn": "_id"
>>> }
>>>
>>> # Initialize Spark session
>>> spark = SparkSession.builder \
>>> .appName("MongoDBDataLoad") \
>>> .config("spark.mongodb.input.uri", 
>>> "mongodb://username:password@host:port/database.collection")
>>> \
>>> .getOrCreate()
>>>
>>> # Fetch the dynamic upper bound
>>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>>
>>> # Define your segment size
>>> segment_size = 10
>>> lower_bound = 0
>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>> range(lower_bound, upper_bound, segment_size)]
>>>
>>> # Function to load a segment
>>> def load_segment(segment):
>>> segment_lower_bound, segment_upper_bound = segment
>>> mongo_config = mongo_config_base.copy()
>>> mongo_config["lowerBound"] = str(segment_lower_bound)
>>> mongo_config["upperBound"] = str(segment_upper_bound)
>>> return extract_data_from_mongodb(mongo_config)
>>>
>>> # Collect all DataFrames from threads
>>> all_dfs = []
>>>
>>> with ThreadPoolExecutor() as executor:
>>> futures = [executor.submit(load_segment, segment) for segment in
>>> segments]
>>> for future in 

Re: Unable to load MongoDB atlas data via PySpark because of BsonString error

2024-06-09 Thread Perez
Hi Team,

Any help in this matter would be greatly appreciated.

TIA

On Sun, Jun 9, 2024 at 11:26 AM Perez  wrote:

> Hi Team,
>
> this is the problem
> https://stackoverflow.com/questions/78593858/unable-to-load-mongodb-atlas-data-via-pyspark-jdbc-in-glue
>
> I can't go ahead with *StructType* approach since my input record is huge
> and if the underlying attributes are added or removed my code might fail.
>
> I can't change the source data either.
>
> The only thing I can think of is loading via Python client with multiple
> threads but do let me know if there is another solution for this.
>
> TIA
>


Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-08 Thread Someshwar Kale
Hi Chhavi,

Currently there is no way to handle backtick(`) spark StructType. Hence the
field name a.b and `a.b` are completely different within StructType.

To handle that, I have added a custom implementation fixing StringIndexer#
validateAndTransformSchema. You can refer to the code on my github

.

*Regards,*
*Someshwar Kale *





On Sat, Jun 8, 2024 at 12:00 PM Chhavi Bansal 
wrote:

> Hi Someshwar,
> Thanks for the response, I have added my comments to the ticket
> .
>
>
> Thanks,
> Chhavi Bansal
>
> On Thu, 6 Jun 2024 at 17:28, Someshwar Kale  wrote:
>
>> As a fix, you may consider adding a transformer to rename columns
>> (perhaps replace all columns with dot to underscore) and use the renamed
>> columns in your pipeline as below-
>>
>> val renameColumn = new 
>> RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
>> val si = new 
>> StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
>> val pipeline = new Pipeline().setStages(Array(renameColumn, si))
>> pipeline.fit(flattenedDf).transform(flattenedDf).show()
>>
>>
>> refer my comment
>> 
>>  for
>> elaboration.
>> Thanks!!
>>
>> *Regards,*
>> *Someshwar Kale*
>>
>>
>>
>>
>>
>> On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
>> wrote:
>>
>>> Hello team
>>> I was exploring feature transformation exposed via Mllib on nested
>>> dataset, and encountered an error while applying any transformer to a
>>> column with dot notation naming. I thought of raising a ticket on spark
>>> https://issues.apache.org/jira/browse/SPARK-48463, where I have
>>> mentioned the entire scenario.
>>>
>>> I wanted to get suggestions on what would be the best way to solve the
>>> problem while using the dot notation. One workaround is to use`_` while
>>> flattening the dataframe, but that would mean having an additional overhead
>>> to convert back to `.` (dot notation ) since that’s the convention for our
>>> other flattened data.
>>>
>>> I would be happy to make a contribution to the code if someone can shed
>>> some light on how this could be solved.
>>>
>>>
>>>
>>> --
>>> Thanks and Regards,
>>> Chhavi Bansal
>>>
>>
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-08 Thread Chhavi Bansal
Hi Someshwar,
Thanks for the response, I have added my comments to the ticket
.


Thanks,
Chhavi Bansal

On Thu, 6 Jun 2024 at 17:28, Someshwar Kale  wrote:

> As a fix, you may consider adding a transformer to rename columns (perhaps
> replace all columns with dot to underscore) and use the renamed columns in
> your pipeline as below-
>
> val renameColumn = new 
> RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
> val si = new 
> StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(renameColumn, si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show()
>
>
> refer my comment
> 
>  for
> elaboration.
> Thanks!!
>
> *Regards,*
> *Someshwar Kale*
>
>
>
>
>
> On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
> wrote:
>
>> Hello team
>> I was exploring feature transformation exposed via Mllib on nested
>> dataset, and encountered an error while applying any transformer to a
>> column with dot notation naming. I thought of raising a ticket on spark
>> https://issues.apache.org/jira/browse/SPARK-48463, where I have
>> mentioned the entire scenario.
>>
>> I wanted to get suggestions on what would be the best way to solve the
>> problem while using the dot notation. One workaround is to use`_` while
>> flattening the dataframe, but that would mean having an additional overhead
>> to convert back to `.` (dot notation ) since that’s the convention for our
>> other flattened data.
>>
>> I would be happy to make a contribution to the code if someone can shed
>> some light on how this could be solved.
>>
>>
>>
>> --
>> Thanks and Regards,
>> Chhavi Bansal
>>
>

-- 
Thanks and Regards,
Chhavi Bansal


Re: OOM issue in Spark Driver

2024-06-08 Thread Andrzej Zera
Hey, do you perform stateful operations? Maybe your state is growing
indefinitely - a screenshot with state metrics would help (you can find it
in Spark UI -> Structured Streaming -> your query). Do you have a
driver-only cluster or do you have workers too? What's the memory usage
profile at workers?

Regards,
Andrzej


sob., 8 cze 2024 o 10:39 Karthick Nk  napisał(a):

> Hi All,
>
> I am using the pyspark structure streaming with Azure Databricks for data
> load process.
>
> In the Pipeline I am using a Job cluster and I am running only one
> pipeline, I am getting the OUT OF MEMORY issue while running for a
> long time. When I inspect the metrics of the cluster I found that, the
> memory usage getting increased by time by time even when there is no
> huge volume of data.
>
> [image: image.png]
>
>
> [image: image.png]
>
> After 4 hours of running the pipeline continuously, I am getting out of
> memory issue where used memory in the driver getting increased from 47 GB
> to 111 GB which is almost triple, I am unable to understand why this many
> memory occcupied in the driver. Am I missing anything here to notice? Could
> you guide me to figure out the root cause?
>
> Note:
> 1. I confirmed persist and unpersist that I used in code taken care
> properly for every batch execution.
> 2. Data is not increasing when time passes, (stream data getting almost
> same amount of data for every batch)
>
>
> Thanks,
>
>
>
>


Re: 7368396 - Apache Spark 3.5.1 (Support)

2024-06-07 Thread Sadha Chilukoori
Hi Alex,

Spark is an open source software available under  Apache License 2.0 (
https://www.apache.org/licenses/), further details can be found here in the
FAQ page (https://spark.apache.org/faq.html).

Hope this helps.


Thanks,

Sadha

On Thu, Jun 6, 2024, 1:32 PM SANTOS SOUZA, ALEX 
wrote:

> Hey guys!
>
>
>
> I am part of the team responsible for software approval at EMBRAER S.A.
> We are currently in the process of approving the Apache Spark 3.5.1
> software and are verifying the licensing of the application.
> Therefore, I would like to kindly request you to answer the questions
> below.
>
> -What type of software? (Commercial, Freeware, Component, etc...)
>  A:
>
> -What is the licensing model for commercial use? (Subscription, Perpetual,
> GPL, etc...)
> A:
>
> -What type of license? (By user, Competitor, Device, Server or others)?
> A:
>
> -Number of installations allowed per license/subscription?
> A:
>
> Can it be used in the defense and aerospace industry? (Company that
> manufactures products for national defense)
> A:
>
> -Does the license allow use in any location regardless of the origin of
> the purchase (tax restriction)?
> A:
>
> -Where can I find the End User License Agreement (EULA) for the version in
> question?
> A:
>
>
>
> Desde já, muito obrigado e qualquer dúvida estou à disposição. / Thank you
> very much in advance and I am at your disposal if you have any questions.
>
>
> Att,
>
>
> Alex Santos Souza
>
> Software Asset Management - Embraer
>
> WhatsApp: +55 12 99731-7579
>
> E-mail: alex.santosso...@dxc.com
>
> DXC Technology
>
> São José dos Campos, SP - Brazil
>
>


Re: Kubernetes cluster: change log4j configuration using uploaded `--files`

2024-06-06 Thread Mich Talebzadeh
The issue you are encountering is due to the order of operations when Spark
initializes the JVM for driver and executor pods. The JVM options
(-Dlog4j2.configurationFile) are evaluated when the JVM starts, but the
--files option copies the files after the JVM has already started. Hence,
the log4j configuration file is not found at the time the JVM is looking
for it.

In summary, you need to ensure the file is in place before the Spark driver
or executor JVM starts.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 17:04, Jennifer Wirth  wrote:

> Hi,
>
> I am trying to change the log4j configuration for jobs submitted to a k8s
> cluster (using submit)
>
> The my-log4j.xml is uploaded using --files ,./my-log4j.xml and the file
> in the working directory of the driver/exec pods.
>
> I added D-flags using the extra java options (and tried many different
> URIs, absolute, with and without file:.
>
> --conf spark.driver.extraJavaOptions="-Dlog4j2.debug=false 
> -Dlog4j2.configurationFile=file:./my-log4j.xml" \
> --conf spark.executor.extraJavaOptions="-Dlog4j2.debug=false 
> -Dlog4j2.configurationFile=file:./my-log4j.xml" \
>
> When debugging i notice that log4j is not able to load my configuration
> file. I see the following additional log entries:
>
> ERROR StatusLogger Reconfiguration failed: No configuration found for 
> '4a87761d' at 'null' in 'null'
> ERROR StatusLogger Reconfiguration failed: No configuration found for 
> 'Default' at 'null' in 'null'
> 24/06/06 09:20:44 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Files 
> file:///tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
>  from 
> /tmp/mk2/spark-upload-36b1f43d-1878-4b06-be5e-e25b703f28d5/orbit-movements.csv
>  to /opt/spark/work-dir/orbit-movements.csv
> Files 
> file:///tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml
>  from /tmp/mk2/spark-upload-cc211704-f481-4ebe-b6f0-5dbe66a7c639/my-log4j.xml 
> to /opt/spark/work-dir/my-log4j.xml
> Files 
> file:///tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar 
> from /tmp/mk2/spark-upload-7970b482-7669-49aa-9f88-65191a83a18a/out.jar to 
> /opt/spark/work-dir/out.jar
>
> The lines starting with Files in the logs of the Driver process, makes me
> wonder if the copying of files from my shared mount to the working
> directory happens in that process and is not something that happens before
> the java process launches. Is that assumption correct, as it would explain
> why my log4j config files are not found at JVM launch.
>
> If so, what is the recommended way to change the logging config *per job*
> when running spark in k8s (i am not using a custom container image, so
> can’t place it in there)
>
> tx.,
>


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Also can I take my lower bound starting from 1 or is it index?

On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:

> Thanks again Mich. It gives the clear picture but I have again couple of
> doubts:
>
> 1) I know that there will be multiple threads that will be executed with
> 10 segment sizes each until the upper bound is reached but I didn't get
> this part of the code exactly segments = [(i, min(i + segment_size,
> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>
> 2) Also performing union on these small dataframes won't impact
> performance right? since spark has to shuffle and combine less data from
> these dataframes?
>
>
> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
> wrote:
>
>> well you can dynamically determine the upper bound by first querying the
>> database to find the maximum value of the partition column and use it as
>> the upper bound for your partitioning logic.
>>
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # Define your MongoDB config without the bounds first
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> Then you need to aggregate DF from multiple threads When loading data in
>> parallel, each thread will load a segment of data into its own DataFrame.
>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>> union method in PySpark.*
>>
>> from concurrent.futures import ThreadPoolExecutor, as_completed
>> from pyspark.sql import SparkSession
>>
>> def extract_data_from_mongodb(mongo_config):
>> df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> return df
>>
>> # Function to get the maximum value of the partition column
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # MongoDB configuration without bounds
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Initialize Spark session
>> spark = SparkSession.builder \
>> .appName("MongoDBDataLoad") \
>> .config("spark.mongodb.input.uri", 
>> "mongodb://username:password@host:port/database.collection")
>> \
>> .getOrCreate()
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> # Function to load a segment
>> def load_segment(segment):
>> segment_lower_bound, segment_upper_bound = segment
>> mongo_config = mongo_config_base.copy()
>> mongo_config["lowerBound"] = str(segment_lower_bound)
>> mongo_config["upperBound"] = str(segment_upper_bound)
>> return extract_data_from_mongodb(mongo_config)
>>
>> # Collect all DataFrames from threads
>> all_dfs = []
>>
>> with ThreadPoolExecutor() as executor:
>> futures = [executor.submit(load_segment, segment) for segment in
>> segments]
>> for future in as_completed(futures):
>> try:
>> df_segment = future.result()
>> all_dfs.append(df_segment)
>> except Exception as e:
>> print(f"Error: {e}")
>>
>> # Union all DataFrames into a single DataFrame
>> if all_dfs:
>> final_df = all_dfs[0]
>> for df in all_dfs[1:]:
>> final_df = final_df.union(df)
>>
>> # Proceed with your final DataFrame
>> final_df.show()
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD  Imperial
>> College London 
>> London, United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks again Mich. It gives the clear picture but I have again couple of
doubts:

1) I know that there will be multiple threads that will be executed with 10
segment sizes each until the upper bound is reached but I didn't get this
part of the code exactly segments = [(i, min(i + segment_size,
upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]

2) Also performing union on these small dataframes won't impact performance
right? since spark has to shuffle and combine less data from these
dataframes?


On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
wrote:

> well you can dynamically determine the upper bound by first querying the
> database to find the maximum value of the partition column and use it as
> the upper bound for your partitioning logic.
>
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # Define your MongoDB config without the bounds first
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> Then you need to aggregate DF from multiple threads When loading data in
> parallel, each thread will load a segment of data into its own DataFrame.
> To aggregate all these DataFrames into a single DataFrame, you can use t*he
> union method in PySpark.*
>
> from concurrent.futures import ThreadPoolExecutor, as_completed
> from pyspark.sql import SparkSession
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # Function to get the maximum value of the partition column
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # MongoDB configuration without bounds
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Function to load a segment
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_base.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> # Collect all DataFrames from threads
> all_dfs = []
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> all_dfs.append(df_segment)
> except Exception as e:
> print(f"Error: {e}")
>
> # Union all DataFrames into a single DataFrame
> if all_dfs:
> final_df = all_dfs[0]
> for df in all_dfs[1:]:
> final_df = final_df.union(df)
>
> # Proceed with your final DataFrame
> final_df.show()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 6 Jun 2024 at 10:52, Perez  wrote:
>
>> Thanks, Mich for your response. However, I have multiple doubts as below:
>>
>> 1) I am trying to load the data for the incremental batch so I am not
>> sure what 

Re: [SPARK-48463] Mllib Feature transformer failing with nested dataset (Dot notation)

2024-06-06 Thread Someshwar Kale
As a fix, you may consider adding a transformer to rename columns (perhaps
replace all columns with dot to underscore) and use the renamed columns in
your pipeline as below-

val renameColumn = new
RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
val si = new 
StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
val pipeline = new Pipeline().setStages(Array(renameColumn, si))
pipeline.fit(flattenedDf).transform(flattenedDf).show()


refer my comment

for
elaboration.
Thanks!!

*Regards,*
*Someshwar Kale*





On Thu, Jun 6, 2024 at 3:24 AM Chhavi Bansal 
wrote:

> Hello team
> I was exploring feature transformation exposed via Mllib on nested
> dataset, and encountered an error while applying any transformer to a
> column with dot notation naming. I thought of raising a ticket on spark
> https://issues.apache.org/jira/browse/SPARK-48463, where I have mentioned
> the entire scenario.
>
> I wanted to get suggestions on what would be the best way to solve the
> problem while using the dot notation. One workaround is to use`_` while
> flattening the dataframe, but that would mean having an additional overhead
> to convert back to `.` (dot notation ) since that’s the convention for our
> other flattened data.
>
> I would be happy to make a contribution to the code if someone can shed
> some light on how this could be solved.
>
>
>
> --
> Thanks and Regards,
> Chhavi Bansal
>


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
well you can dynamically determine the upper bound by first querying the
database to find the maximum value of the partition column and use it as
the upper bound for your partitioning logic.

def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# Define your MongoDB config without the bounds first
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

Then you need to aggregate DF from multiple threads When loading data in
parallel, each thread will load a segment of data into its own DataFrame.
To aggregate all these DataFrames into a single DataFrame, you can use t*he
union method in PySpark.*

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# Function to get the maximum value of the partition column
def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# MongoDB configuration without bounds
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Function to load a segment
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_base.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

# Collect all DataFrames from threads
all_dfs = []

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
all_dfs.append(df_segment)
except Exception as e:
print(f"Error: {e}")

# Union all DataFrames into a single DataFrame
if all_dfs:
final_df = all_dfs[0]
for df in all_dfs[1:]:
final_df = final_df.union(df)

# Proceed with your final DataFrame
final_df.show()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 10:52, Perez  wrote:

> Thanks, Mich for your response. However, I have multiple doubts as below:
>
> 1) I am trying to load the data for the incremental batch so I am not sure
> what would be my upper bound. So what can we do?
> 2) So as each thread loads the desired segment size's data into a
> dataframe if I want to aggregate all the data from all the threads in a
> single dataframe what should I do? Keep on appending in a dataframe as it
> comes?
>
>
> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
> wrote:
>
>> Yes, partitioning and parallel loading can significantly improve the
>> performance of data extraction from JDBC sources or databases like MongoDB.
>> This approach can leverage Spark's distributed computing capabilities,
>> allowing you to load data in parallel, thus speeding up the overall data
>> loading process.
>>
>> When loading data from JDBC sources, specifying partitioning options
>> allows Spark to parallelize the data read operation. Here's how you 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks, Mich for your response. However, I have multiple doubts as below:

1) I am trying to load the data for the incremental batch so I am not sure
what would be my upper bound. So what can we do?
2) So as each thread loads the desired segment size's data into a dataframe
if I want to aggregate all the data from all the threads in a single
dataframe what should I do? Keep on appending in a dataframe as it comes?


On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
wrote:

> Yes, partitioning and parallel loading can significantly improve the
> performance of data extraction from JDBC sources or databases like MongoDB.
> This approach can leverage Spark's distributed computing capabilities,
> allowing you to load data in parallel, thus speeding up the overall data
> loading process.
>
> When loading data from JDBC sources, specifying partitioning options
> allows Spark to parallelize the data read operation. Here's how you can do
> it for a JDBC source:
>
> Something like below given the information provided
>
> from pyspark.sql import SparkSession
> from concurrent.futures import ThreadPoolExecutor, as_completed
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # MongoDB configuration
> mongo_config_template = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id",
> "lowerBound": None,
> "upperBound": None
> }
>
> lower_bound = 0
> upper_bound = 200
> segment_size = 10
>
> # Create segments
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Extract data in parallel using ThreadPoolExecutor
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_template.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> # Process df_segment as needed
> except Exception as e:
> print(f"Error: {e}")
>
>
> ThreadPoolExecutor enables parallel execution of tasks using multiple
> threads. Each thread can be responsible for loading a segment of the data.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London  (voted 2nd
> best university in the world after MIT https://lnkd.in/eCPt6KTj)
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 6 Jun 2024 at 00:46, Perez  wrote:
>
>> Hello experts,
>>
>> I was just wondering if I could leverage the below thing to expedite the
>> loading of the data process in Spark.
>>
>>
>> def extract_data_from_mongodb(mongo_config): df =
>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>> connection_options=mongo_config ) return df
>>
>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
>> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
>> segment_size)] with ThreadPoolExecutor() as executor: futures =
>> [executor.submit(execution, segment) for segment in segments] for future in
>> as_completed(futures): try: future.result() except Exception as e:
>> print(f"Error: {e}")
>>
>> I am trying to leverage the parallel threads to pull data in parallel. So
>> is it effective?
>>
>


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
Yes, partitioning and parallel loading can significantly improve the
performance of data extraction from JDBC sources or databases like MongoDB.
This approach can leverage Spark's distributed computing capabilities,
allowing you to load data in parallel, thus speeding up the overall data
loading process.

When loading data from JDBC sources, specifying partitioning options allows
Spark to parallelize the data read operation. Here's how you can do it for
a JDBC source:

Something like below given the information provided

from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# MongoDB configuration
mongo_config_template = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id",
"lowerBound": None,
"upperBound": None
}

lower_bound = 0
upper_bound = 200
segment_size = 10

# Create segments
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Extract data in parallel using ThreadPoolExecutor
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_template.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
# Process df_segment as needed
except Exception as e:
print(f"Error: {e}")


ThreadPoolExecutor enables parallel execution of tasks using multiple
threads. Each thread can be responsible for loading a segment of the data.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London  (voted 2nd
best university in the world after MIT https://lnkd.in/eCPt6KTj)
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 6 Jun 2024 at 00:46, Perez  wrote:

> Hello experts,
>
> I was just wondering if I could leverage the below thing to expedite the
> loading of the data process in Spark.
>
>
> def extract_data_from_mongodb(mongo_config): df =
> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
> connection_options=mongo_config ) return df
>
> mongo_config = { "connection.uri": "mongodb://url", "database": "",
> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
> segment_size)] with ThreadPoolExecutor() as executor: futures =
> [executor.submit(execution, segment) for segment in segments] for future in
> as_completed(futures): try: future.result() except Exception as e:
> print(f"Error: {e}")
>
> I am trying to leverage the parallel threads to pull data in parallel. So
> is it effective?
>


Re: Terabytes data processing via Glue

2024-06-05 Thread Perez
Thanks Nitin and Russel for your responses. Much appreciated.

On Mon, Jun 3, 2024 at 9:47 PM Russell Jurney 
wrote:

> You could use either Glue or Spark for your job. Use what you’re more
> comfortable with.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Sun, Jun 2, 2024 at 9:59 PM Perez  wrote:
>
>> Hello,
>>
>> Can I get some suggestions?
>>
>> On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:
>>
>>> Hi Team,
>>>
>>> I am planning to load and process around 2 TB historical data. For that
>>> purpose I was planning to go ahead with Glue.
>>>
>>> So is it ok if I use glue if I calculate my DPUs needed correctly? or
>>> should I go with EMR.
>>>
>>> This will be a one time activity.
>>>
>>>
>>> TIA
>>>
>>


Re: Classification request

2024-06-04 Thread Dirk-Willem van Gulik
Actually - that answer may oversimplify things / be rather incorrect depending 
on the exact question of the entity that asks and the exact situation (who 
ships what code from where).

For this reason it is properly best to refer this original poster to:

https://www.apache.org/licenses/exports/

Which explains that Spark is classified as ECCN 5d002, and is subject to EAR.

 And to refer developers or those that need to bundle Spark, or otherwise place 
it on the market to:

https://infra.apache.org/crypto.html

And when in doubt - contact the Spark PMC or the ASF security team.

With kind regards,

Dw.


> On 4 Jun 2024, at 15:20, Artemis User  wrote:
> 
> Sara, Apache Spark is open source under Apache License 2.0 
> (https://github.com/apache/spark/blob/master/LICENSE).  It is not under 
> export control of any country!  Please feel free to use, reproduce and 
> distribute, as long as your practice is compliant with the license.
> 
> Having said that, some components in Apache Spark may be under other open 
> source licenses, whose terms and conditions may be different than Apache's.  
> 
> Cheers!
> ND
> 
> On 6/4/24 8:01 AM, VARGA, Sara wrote:
>> Hello,
>>  
>> my name is Sara and I am working in export control at MTU.
>>  
>> In order to ensure export compliance, we would like to be informed about the 
>> export control classification for your items:
>>  
>> Apache Spark 3.0.1
>> Apache Spark 3.2.1
>>  
>> Please be so kind and use the attached Supplier Export Control Declaration 
>> and return the completed excel file and a pdf scan of the signed document 
>> via email.
>>  
>> Thanks in advance
>>  
>>  
>> Mit freundlichen Gruessen / Best regards
>>  
>> 
   
>> Sara Varga
>> Exportkontrolle
>> Export control
>> 
>> MTU Aero Engines AG
>> Dachauer Str. 665 | 80995 Muenchen | Germany
>> 
>> 
>> sara.va...@mtu.de  | www.mtu.de 
>> 
>> 
>>  
>> 
>>  
>>  
>>  
>> --
>> MTU Aero Engines AG
>> Vorstand/Board of Management: Lars Wagner, Vorsitzender/CEO; Peter 
>> Kameritsch, Dr. Silke Maurer, Michael Schreyoegg
>> Vorsitzender des Aufsichtsrats/Chairman of the Supervisory Board: Gordon 
>> Riske
>> Sitz der Gesellschaft/Registered Office: Muenchen
>> Handelsregister/Commercial Register: Muenchen HRB 157206
>> Lobbyregister/Lobbying Register: R002076
>> 
>> Diese E-Mail sowie ihre Anhaenge enthalten MTU-eigene vertrauliche oder 
>> rechtlich geschuetzte Informationen.
>> Wenn Sie nicht der beabsichtigte Empfaenger sind, informieren Sie bitte den 
>> Absender und loeschen Sie diese
>> E-Mail sowie die Anhaenge. Das unbefugte Speichern, Kopieren oder 
>> Weiterleiten ist nicht gestattet.
>> 
>> This e-mail and any attached documents are proprietary to MTU, confidential 
>> or protected by law.
>> If you are not the intended recipient, please advise the sender and delete 
>> this message and its attachments.
>> Any unauthorised storing, copying or distribution is prohibited.
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 


Re: Classification request

2024-06-04 Thread Artemis User
Sara, Apache Spark is open source under Apache License 2.0 
(https://github.com/apache/spark/blob/master/LICENSE).  It is not under 
export control of any country!  Please feel free to use, reproduce and 
distribute, as long as your practice is compliant with the license.


Having said that, some components in Apache Spark may be under other 
open source licenses, whose terms and conditions may be different than 
Apache's.


Cheers!
ND

On 6/4/24 8:01 AM, VARGA, Sara wrote:


Hello,

my name is Sara and I am working in export control at MTU.

In order to ensure export compliance, we would like to be informed 
about the export control classification for your items:


**

*Apache Spark 3.0.1*

*Apache Spark 3.2.1*

**

Please be so kind and use the attached Supplier Export Control 
Declaration and return the completed excel file and a pdf scan of the 
signed document via email.


Thanks in advance

Mit freundlichen Gruessen / Best regards




*Sara Varga*
Exportkontrolle
/Export control/

*MTU Aero Engines AG*
Dachauer Str. 665 | 80995 Muenchen | Germany


sara.va...@mtu.de  | www.mtu.de 





--
*MTU Aero Engines AG*
Vorstand/Board of Management: Lars Wagner, Vorsitzender/CEO; Peter 
Kameritsch, Dr. Silke Maurer, Michael Schreyoegg
Vorsitzender des Aufsichtsrats/Chairman of the Supervisory Board: 
Gordon Riske

Sitz der Gesellschaft/Registered Office: Muenchen
Handelsregister/Commercial Register: Muenchen HRB 157206
Lobbyregister/Lobbying Register: R002076

Diese E-Mail sowie ihre Anhaenge enthalten MTU-eigene vertrauliche 
oder rechtlich geschuetzte Informationen.
Wenn Sie nicht der beabsichtigte Empfaenger sind, informieren Sie 
bitte den Absender und loeschen Sie diese
E-Mail sowie die Anhaenge. Das unbefugte Speichern, Kopieren oder 
Weiterleiten ist nicht gestattet.


This e-mail and any attached documents are proprietary to MTU, 
confidential or protected by law.
If you are not the intended recipient, please advise the sender and 
delete this message and its attachments.

Any unauthorised storing, copying or distribution is prohibited.

-
To unsubscribe e-mail:user-unsubscr...@spark.apache.org


Re: Terabytes data processing via Glue

2024-06-03 Thread Russell Jurney
You could use either Glue or Spark for your job. Use what you’re more
comfortable with.

Thanks,
Russell Jurney @rjurney 
russell.jur...@gmail.com LI  FB
 datasyndrome.com


On Sun, Jun 2, 2024 at 9:59 PM Perez  wrote:

> Hello,
>
> Can I get some suggestions?
>
> On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:
>
>> Hi Team,
>>
>> I am planning to load and process around 2 TB historical data. For that
>> purpose I was planning to go ahead with Glue.
>>
>> So is it ok if I use glue if I calculate my DPUs needed correctly? or
>> should I go with EMR.
>>
>> This will be a one time activity.
>>
>>
>> TIA
>>
>


Re: Terabytes data processing via Glue

2024-06-02 Thread Perez
Hello,

Can I get some suggestions?

On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:

> Hi Team,
>
> I am planning to load and process around 2 TB historical data. For that
> purpose I was planning to go ahead with Glue.
>
> So is it ok if I use glue if I calculate my DPUs needed correctly? or
> should I go with EMR.
>
> This will be a one time activity.
>
>
> TIA
>


Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Amin Mosayyebzadeh
I am reading from a single file:
df = spark.read.text("s3a://test-bucket/testfile.csv")



On Fri, May 31, 2024 at 5:26 AM Mich Talebzadeh 
wrote:

> Tell Spark to read from a single file
>
> data = spark.read.text("s3a://test-bucket/testfile.csv")
>
> This clarifies to Spark that you are dealing with a single file and avoids
> any bucket-like interpretation.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
> wrote:
>
>> I will work on the first two possible causes.
>> For the third one, which I guess is the real problem, Spark treats the
>> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
>> to access _spark_metadata with url
>> s3a://test-bucket/testfile.csv/_spark_metadata
>> testfile.csv is an object and should not be treated as a bucket. But I am
>> not sure how to prevent Spark from doing that.
>>
>


Re: [s3a] Spark is not reading s3 object content

2024-05-31 Thread Mich Talebzadeh
Tell Spark to read from a single file

data = spark.read.text("s3a://test-bucket/testfile.csv")

This clarifies to Spark that you are dealing with a single file and avoids
any bucket-like interpretation.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 31 May 2024 at 09:53, Amin Mosayyebzadeh 
wrote:

> I will work on the first two possible causes.
> For the third one, which I guess is the real problem, Spark treats the
> testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
> to access _spark_metadata with url
> s3a://test-bucket/testfile.csv/_spark_metadata
> testfile.csv is an object and should not be treated as a bucket. But I am
> not sure how to prevent Spark from doing that.
>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-30 Thread Subhasis Mukherjee
Regarding making spark writer fast part, If you are (or can be) on Databricks, 
check this out. It is just out of the oven at Databricks.

https://www.databricks.com/blog/announcing-general-availability-liquid-clustering?utm_source=bambu&utm_medium=social&utm_campaign=advocacy&blaid=6087618




From: Gera Shegalov 
Sent: Wednesday, May 29, 2024 7:57:56 am
To: Prem Sahoo 
Cc: eab...@163.com ; Vibhor Gupta ; 
user @spark 
Subject: Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

I agree with the previous answers that (if requirements allow it) it is much 
easier to just orchestrate a copy either in the same app or sync externally.

A long time ago and not for a Spark app we were solving a similar usecase via 
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
 . It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
I am looking for writer/comitter optimization which can make the spark write 
faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com<mailto:eab...@163.com> 
mailto:eab...@163.com>> wrote:
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.


eabour

From: Prem Sahoo<mailto:prem.re...@gmail.com>
Date: 2024-05-22 00:38
To: Vibhor Gupta<mailto:vibhor.gu...@walmart.com>; 
user<mailto:user@spark.apache.org>
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta 
mailto:vibhor.gu...@walmart.com>> wrote:


Hi Prem,

You can try to write to HDFS then read from HDFS and write to MinIO.

This will prevent duplicate transformation.

You can also try persisting the dataframe using the DISK_ONLY level.

Regards,
Vibhor
From: Prem Sahoo mailto:prem.re...@gmail.com>>
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list mailto:d...@spark.apache.org>>
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time .

Scenario:-

Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?

Read the parquet file and do a few transformations and write to HDFS and MinIO.

here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?

Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?

I don't want to do repartition and write as repartition will have overhead of 
shuffling .

Please provide some inputs.





Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
I will work on the first two possible causes.
For the third one, which I guess is the real problem, Spark treats the
testfile.csv object with the url s3a://test-bucket/testfile.csv as a bucket
to access _spark_metadata with url
s3a://test-bucket/testfile.csv/_spark_metadata
testfile.csv is an object and should not be treated as a bucket. But I am
not sure how to prevent Spark from doing that.


Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
ok

some observations


   - Spark job successfully lists the S3 bucket containing testfile.csv.
   - Spark job can retrieve the file size (33 Bytes) for testfile.csv.
   - Spark job fails to read the actual data from testfile.csv.
   - The printed content from testfile.csv is an empty list.
   - Spark logs show a debug message with an exception related to
   UserGroupInformation while trying to access the _spark_metadata file
   associated with testfile.csv.

possible causes


   - Permission Issues: Spark user (likely ubuntu based on logs) might lack
   the necessary permissions to access the testfile.csv file or the
   _spark_metadata file on S3 storage.
   - Spark Configuration: Issues with Spark's configuration for S3 access,
   such as missing credentials or incorrect security settings.
   - Spark attempting to read unnecessary files: The _spark_metadata file
   might not be essential for your current operation, and Spark's attempt to
   read it could be causing the issue.


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 30 May 2024 at 22:29, Amin Mosayyebzadeh 
wrote:

> The code should read testfile.csv file from s3. and print the content. It
> only prints a empty list although the file has content.
> I have also checked our custom s3 storage (Ceph based) logs and I see only
> LIST operations coming from Spark, there is no GET object operation for
> testfile.csv
>
> The only error I see in DEBUG output is these lines:
>
> =
> 24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
> from s3a://test-bucket/testfile.csv/_spark_metadata
> 24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
> (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
> java.lang.Exception
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
> at
> org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
> at
> org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
> at
> org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
> at scala.Option.getOrElse(Option.scala:201)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at
> org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
> at py4j.ClientServerConnection.run(ClientSer

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Amin Mosayyebzadeh
The code should read testfile.csv file from s3. and print the content. It
only prints a empty list although the file has content.
I have also checked our custom s3 storage (Ceph based) logs and I see only
LIST operations coming from Spark, there is no GET object operation for
testfile.csv

The only error I see in DEBUG output is these lines:

=
24/05/30 15:39:21 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 15:39:21 DEBUG UserGroupInformation: PrivilegedAction [as: ubuntu
(auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@7af85238]
java.lang.Exception
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at
org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
at
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:311)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:352)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209)
at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:64)
at
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:48)
at
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:91)
at
org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.(MetadataLogFileIndex.scala:52)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:369)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:201)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:646)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

===
Which I am not sure if it is related since Spark can see and list the
bucket (it also returns the correct object size which is 33 Bytes.).

Best,
Amin


On Thu, May 30, 2024 at 4:05 PM Mich Talebzadeh 
wrote:

> Hello,
>
> Overall, the exit code of 0 suggests a successful run of your Spark job.
> Analyze the intended purpose of your code and verify the output or Spark UI
> for further confirmation.
>
> 24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with
> exitCode 0.
>
> what to check
>
>
>1. Verify Output: If your Spark job was intended to read data from S3
>and process it, you will need to verify the output to ensure the data was
>handled correctly. This might involve checking if any results were written
>to a designated location or if any transformations were applied
>successfully.
>2. Review Code:
>3. Check Spark UI:
>
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
> wrote:
>
>> Hi Mich,
>>
>> Thank you for the help and sorry about the late reply.
>> I ran

Re: [s3a] Spark is not reading s3 object content

2024-05-30 Thread Mich Talebzadeh
Hello,

Overall, the exit code of 0 suggests a successful run of your Spark job.
Analyze the intended purpose of your code and verify the output or Spark UI
for further confirmation.

24/05/30 01:23:43 INFO SparkContext: SparkContext is stopping with exitCode
0.

what to check


   1. Verify Output: If your Spark job was intended to read data from S3
   and process it, you will need to verify the output to ensure the data was
   handled correctly. This might involve checking if any results were written
   to a designated location or if any transformations were applied
   successfully.
   2. Review Code:
   3. Check Spark UI:


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 30 May 2024 at 11:56, Amin Mosayyebzadeh 
wrote:

> Hi Mich,
>
> Thank you for the help and sorry about the late reply.
> I ran your provided but I got "exitCode 0". Here is the complete output:
>
> ===
>
>
> 24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
> 24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic,
> amd64
> 24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
> 24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
> spark.driver.
> 24/05/30 01:23:38 INFO ResourceUtils:
> ==
> 24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
> 24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
> executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
> , memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
> name: cpus, amount: 1.0)
> 24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
> 24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
> 24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
> 24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: ubuntu; groups
> with view permissions: EMPTY; users with modify permissions: ubuntu; groups
> with modify permissions: EMPTY
> 24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver'
> on port 46321.
> 24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
> 24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
> 24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
> 24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
> GiB
> 24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
> 24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
> 24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
> MOC-R4PAC08U33-S1C
> 24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
> 24/05/30 01:23:39 INFO Executor: Java version 11.0.22
> 24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
> (userClassPathFirst = false): ''
> 24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
> org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
> 24/05/30 01:23:39 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
> 24/05/30 01:23:39 INFO NettyBlockTransferService: Server 

Re: [s3a] Spark is not reading s3 object content

2024-05-29 Thread Amin Mosayyebzadeh
Hi Mich,

Thank you for the help and sorry about the late reply.
I ran your provided but I got "exitCode 0". Here is the complete output:

===


24/05/30 01:23:38 INFO SparkContext: Running Spark version 3.5.0
24/05/30 01:23:38 INFO SparkContext: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:38 INFO SparkContext: Java version 11.0.22
24/05/30 01:23:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
24/05/30 01:23:38 INFO ResourceUtils:
==
24/05/30 01:23:38 INFO ResourceUtils: No custom resources configured for
spark.driver.
24/05/30 01:23:38 INFO ResourceUtils:
==
24/05/30 01:23:38 INFO SparkContext: Submitted application: S3ReadTest
24/05/30 01:23:38 INFO ResourceProfile: Default ResourceProfile created,
executor resources: Map(cores -> name: cores, amount: 1, script: , vendor:
, memory -> name: memory, amount: 1024, script: , vendor: , offHeap ->
name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus ->
name: cpus, amount: 1.0)
24/05/30 01:23:38 INFO ResourceProfile: Limiting resource is cpu
24/05/30 01:23:38 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/05/30 01:23:38 INFO SecurityManager: Changing view acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls to: ubuntu
24/05/30 01:23:38 INFO SecurityManager: Changing view acls groups to:
24/05/30 01:23:38 INFO SecurityManager: Changing modify acls groups to:
24/05/30 01:23:38 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: ubuntu; groups
with view permissions: EMPTY; users with modify permissions: ubuntu; groups
with modify permissions: EMPTY
24/05/30 01:23:38 INFO Utils: Successfully started service 'sparkDriver' on
port 46321.
24/05/30 01:23:38 INFO SparkEnv: Registering MapOutputTracker
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMaster
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
24/05/30 01:23:38 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up
24/05/30 01:23:38 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/30 01:23:38 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-a1fc37d5-885a-4ed0-b8f2-4eeb930c69ee
24/05/30 01:23:38 INFO MemoryStore: MemoryStore started with capacity 2.8
GiB
24/05/30 01:23:38 INFO SparkEnv: Registering OutputCommitCoordinator
24/05/30 01:23:39 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
24/05/30 01:23:39 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
24/05/30 01:23:39 INFO Executor: Starting executor ID driver on host
MOC-R4PAC08U33-S1C
24/05/30 01:23:39 INFO Executor: OS info Linux, 5.4.0-182-generic, amd64
24/05/30 01:23:39 INFO Executor: Java version 11.0.22
24/05/30 01:23:39 INFO Executor: Starting executor with user classpath
(userClassPathFirst = false): ''
24/05/30 01:23:39 INFO Executor: Created or updated repl class loader
org.apache.spark.util.MutableURLClassLoader@a45f4d6 for default.
24/05/30 01:23:39 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 39343.
24/05/30 01:23:39 INFO NettyBlockTransferService: Server created on
MOC-R4PAC08U33-S1C:39343
24/05/30 01:23:39 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
24/05/30 01:23:39 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMasterEndpoint: Registering block
manager MOC-R4PAC08U33-S1C:39343 with 2.8 GiB RAM, BlockManagerId(driver,
MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, MOC-R4PAC08U33-S1C, 39343, None)
24/05/30 01:23:39 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir.
24/05/30 01:23:39 INFO SharedState: Warehouse path is
'file:/home/ubuntu/tpch-spark/spark-warehouse'.
24/05/30 01:23:40 WARN MetricsConfig: Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/05/30 01:23:40 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 10 second(s).
24/05/30 01:23:40 INFO MetricsSystemImpl: s3a-file-system metrics system
started
24/05/30 01:23:41 INFO MetadataLogFileIndex: Reading streaming file log
from s3a://test-bucket/testfile.csv/_spark_metadata
24/05/30 01:23:41 INFO FileStreamSinkLog: BatchIds found from listing:
24/05/30 01:23:43 INFO FileSourceStrategy: Pushed Filters:
24/05/30 01:23:43 INFO FileSourceStrategy: Post-Scan Filters

Re: OOM concern

2024-05-28 Thread Perez
Thanks Mich for the detailed explanation.

On Tue, May 28, 2024 at 9:53 PM Mich Talebzadeh 
wrote:

> Russell mentioned some of these issues before. So in short your mileage
> varies. For a 100 GB data transfer, the speed difference between Glue and
> EMR might not be significant, especially considering the benefits of Glue's
> managed service aspects. However, for much larger datasets or scenarios
> where speed is critical, EMR's customization options might provide a slight
> edge.
>
> My recommendation is test and Compare: If speed is a concern, consider
> running a test job with both Glue and EMR (if feasible) on a smaller subset
> of your data to compare transfer times and costs in your specific
> environment.. Focus on Benefits: If the speed difference with Glue is
> minimal but it offers significant benefits in terms of management and cost
> for your use case, Glue might still be the preferable option.. Also
> bandwidth: Ensure your network bandwidth between the database and S3 is
> sufficient to handle the data transfer rate, regardless of the service you
> choose.
>
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 28 May 2024 at 16:40, Perez  wrote:
>
>> Thanks Mich.
>>
>> Yes, I agree on the costing part but how does the data transfer speed be
>> impacted? Is it because glue takes some time to initialize underlying
>> resources and then process the data?
>>
>>
>> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Your mileage varies as usual
>>>
>>> Glue with DPUs seems like a strong contender for your data transfer
>>> needs based on the simplicity, scalability, and managed service aspects.
>>> However, if data transfer speed is critical or costs become a concern after
>>> testing, consider EMR as an alternative.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD  Imperial
>>> College London 
>>> London, United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>>
 Thank you everyone for your response.

 I am not getting any errors as of now. I am just trying to choose the
 right tool for my task which is data loading from an external source into
 s3 via Glue/EMR.

 I think Glue job would be the best fit for me because I can calculate
 DPUs needed (maybe keeping some extra buffer) so just wanted to check if
 there are any edge cases I need to consider.


 On Tue, May 28, 2024 at 5:39 AM Russell Jurney <
 russell.jur...@gmail.com> wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough
> RAM to accommodate any given partition in your data or you can get an OOM
> error. Not sure if this job involves a reduce, but I would choose a single
> 128GB+ memory optimized instance and then adjust parallelism as via the
> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
> job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the right DPUs(Glue jobs) should solve this problem right?
>> Or should I move to EMR.
>>
>> I don't feel th

Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-28 Thread Gera Shegalov
I agree with the previous answers that (if requirements allow it) it is
much easier to just orchestrate a copy either in the same app or sync
externally.

A long time ago and not for a Spark app we were solving a similar usecase
via
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
. It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo  wrote:

> I am looking for writer/comitter optimization which can make the spark
> write faster.
>
> On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:
>
>> Hi,
>> I think you should write to HDFS then copy file (parquet or orc)
>> from HDFS to MinIO.
>>
>> --
>> eabour
>>
>>
>> *From:* Prem Sahoo 
>> *Date:* 2024-05-22 00:38
>> *To:* Vibhor Gupta ; user
>> 
>> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>>
>>
>> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>>
>>> Hello Vibhor,
>>> Thanks for the suggestion .
>>> I am looking for some other alternatives where I can use the same
>>> dataframe can be written to two destinations without re execution and cache
>>> or persist .
>>>
>>> Can some one help me in scenario 2 ?
>>> How to make spark write to MinIO faster ?
>>> Sent from my iPhone
>>>
>>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>>> wrote:
>>>
>>> 
>>>
>>> Hi Prem,
>>>
>>>
>>>
>>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>>
>>>
>>>
>>> This will prevent duplicate transformation.
>>>
>>>
>>>
>>> You can also try persisting the dataframe using the DISK_ONLY level.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Vibhor
>>>
>>> *From: *Prem Sahoo 
>>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>>> *To: *Spark dev list 
>>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>>
>>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>>
>>> Hello Team,
>>>
>>> I am planning to write to two datasource at the same time .
>>>
>>>
>>>
>>> Scenario:-
>>>
>>>
>>>
>>> Writing the same dataframe to HDFS and MinIO without re-executing the
>>> transformations and no cache(). Then how can we make it faster ?
>>>
>>>
>>>
>>> Read the parquet file and do a few transformations and write to HDFS and
>>> MinIO.
>>>
>>>
>>>
>>> here in both write spark needs execute the transformation again. Do we
>>> know how we can avoid re-execution of transformation  without
>>> cache()/persist ?
>>>
>>>
>>>
>>> Scenario2 :-
>>>
>>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>>
>>> Do we have any way to make writing this faster ?
>>>
>>>
>>>
>>> I don't want to do repartition and write as repartition will have
>>> overhead of shuffling .
>>>
>>>
>>>
>>> Please provide some inputs.
>>>
>>>
>>>
>>>
>>>
>>>


Re: OOM concern

2024-05-28 Thread Russell Jurney
If Glue lets you take a configuration based approach, and you don't have to
operate any servers as with EMR... use Glue. Try EMR if that is troublesome.

Russ

On Tue, May 28, 2024 at 9:23 AM Mich Talebzadeh 
wrote:

> Russell mentioned some of these issues before. So in short your mileage
> varies. For a 100 GB data transfer, the speed difference between Glue and
> EMR might not be significant, especially considering the benefits of Glue's
> managed service aspects. However, for much larger datasets or scenarios
> where speed is critical, EMR's customization options might provide a slight
> edge.
>
> My recommendation is test and Compare: If speed is a concern, consider
> running a test job with both Glue and EMR (if feasible) on a smaller subset
> of your data to compare transfer times and costs in your specific
> environment.. Focus on Benefits: If the speed difference with Glue is
> minimal but it offers significant benefits in terms of management and cost
> for your use case, Glue might still be the preferable option.. Also
> bandwidth: Ensure your network bandwidth between the database and S3 is
> sufficient to handle the data transfer rate, regardless of the service you
> choose.
>
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 28 May 2024 at 16:40, Perez  wrote:
>
>> Thanks Mich.
>>
>> Yes, I agree on the costing part but how does the data transfer speed be
>> impacted? Is it because glue takes some time to initialize underlying
>> resources and then process the data?
>>
>>
>> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Your mileage varies as usual
>>>
>>> Glue with DPUs seems like a strong contender for your data transfer
>>> needs based on the simplicity, scalability, and managed service aspects.
>>> However, if data transfer speed is critical or costs become a concern after
>>> testing, consider EMR as an alternative.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD  Imperial
>>> College London 
>>> London, United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>>
 Thank you everyone for your response.

 I am not getting any errors as of now. I am just trying to choose the
 right tool for my task which is data loading from an external source into
 s3 via Glue/EMR.

 I think Glue job would be the best fit for me because I can calculate
 DPUs needed (maybe keeping some extra buffer) so just wanted to check if
 there are any edge cases I need to consider.


 On Tue, May 28, 2024 at 5:39 AM Russell Jurney <
 russell.jur...@gmail.com> wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough
> RAM to accommodate any given partition in your data or you can get an OOM
> error. Not sure if this job involves a reduce, but I would choose a single
> 128GB+ memory optimized instance and then adjust parallelism as via the
> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
> job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the

Re: OOM concern

2024-05-28 Thread Mich Talebzadeh
Russell mentioned some of these issues before. So in short your mileage
varies. For a 100 GB data transfer, the speed difference between Glue and
EMR might not be significant, especially considering the benefits of Glue's
managed service aspects. However, for much larger datasets or scenarios
where speed is critical, EMR's customization options might provide a slight
edge.

My recommendation is test and Compare: If speed is a concern, consider
running a test job with both Glue and EMR (if feasible) on a smaller subset
of your data to compare transfer times and costs in your specific
environment.. Focus on Benefits: If the speed difference with Glue is
minimal but it offers significant benefits in terms of management and cost
for your use case, Glue might still be the preferable option.. Also
bandwidth: Ensure your network bandwidth between the database and S3 is
sufficient to handle the data transfer rate, regardless of the service you
choose.


HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 28 May 2024 at 16:40, Perez  wrote:

> Thanks Mich.
>
> Yes, I agree on the costing part but how does the data transfer speed be
> impacted? Is it because glue takes some time to initialize underlying
> resources and then process the data?
>
>
> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh 
> wrote:
>
>> Your mileage varies as usual
>>
>> Glue with DPUs seems like a strong contender for your data transfer needs
>> based on the simplicity, scalability, and managed service aspects. However,
>> if data transfer speed is critical or costs become a concern after testing,
>> consider EMR as an alternative.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>> PhD  Imperial
>> College London 
>> London, United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>
>>> Thank you everyone for your response.
>>>
>>> I am not getting any errors as of now. I am just trying to choose the
>>> right tool for my task which is data loading from an external source into
>>> s3 via Glue/EMR.
>>>
>>> I think Glue job would be the best fit for me because I can calculate
>>> DPUs needed (maybe keeping some extra buffer) so just wanted to check if
>>> there are any edge cases I need to consider.
>>>
>>>
>>> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
>>> wrote:
>>>
 If you’re using EMR and Spark, you need to choose nodes with enough RAM
 to accommodate any given partition in your data or you can get an OOM
 error. Not sure if this job involves a reduce, but I would choose a single
 128GB+ memory optimized instance and then adjust parallelism as via the
 Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
 job.

 Thanks,
 Russell Jurney @rjurney 
 russell.jur...@gmail.com LI  FB
  datasyndrome.com


 On Mon, May 27, 2024 at 9:15 AM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise
> suggestions.
>
> TIA.
>



Re: OOM concern

2024-05-28 Thread Perez
Thanks Mich.

Yes, I agree on the costing part but how does the data transfer speed be
impacted? Is it because glue takes some time to initialize underlying
resources and then process the data?


On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh 
wrote:

> Your mileage varies as usual
>
> Glue with DPUs seems like a strong contender for your data transfer needs
> based on the simplicity, scalability, and managed service aspects. However,
> if data transfer speed is critical or costs become a concern after testing,
> consider EMR as an alternative.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD  Imperial College
> London 
> London, United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>
>> Thank you everyone for your response.
>>
>> I am not getting any errors as of now. I am just trying to choose the
>> right tool for my task which is data loading from an external source into
>> s3 via Glue/EMR.
>>
>> I think Glue job would be the best fit for me because I can calculate
>> DPUs needed (maybe keeping some extra buffer) so just wanted to check if
>> there are any edge cases I need to consider.
>>
>>
>> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
>> wrote:
>>
>>> If you’re using EMR and Spark, you need to choose nodes with enough RAM
>>> to accommodate any given partition in your data or you can get an OOM
>>> error. Not sure if this job involves a reduce, but I would choose a single
>>> 128GB+ memory optimized instance and then adjust parallelism as via the
>>> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
>>> job.
>>>
>>> Thanks,
>>> Russell Jurney @rjurney 
>>> russell.jur...@gmail.com LI  FB
>>>  datasyndrome.com
>>>
>>>
>>> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>>>
 Hi Team,

 I want to extract the data from DB and just dump it into S3. I
 don't have to perform any transformations on the data yet. My data size
 would be ~100 GB (historical load).

 Choosing the right DPUs(Glue jobs) should solve this problem right? Or
 should I move to EMR.

 I don't feel the need to move to EMR but wanted the expertise
 suggestions.

 TIA.

>>>


Re: OOM concern

2024-05-28 Thread Mich Talebzadeh
Your mileage varies as usual

Glue with DPUs seems like a strong contender for your data transfer needs
based on the simplicity, scalability, and managed service aspects. However,
if data transfer speed is critical or costs become a concern after testing,
consider EMR as an alternative.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 28 May 2024 at 09:04, Perez  wrote:

> Thank you everyone for your response.
>
> I am not getting any errors as of now. I am just trying to choose the
> right tool for my task which is data loading from an external source into
> s3 via Glue/EMR.
>
> I think Glue job would be the best fit for me because I can calculate DPUs
> needed (maybe keeping some extra buffer) so just wanted to check if there
> are any edge cases I need to consider.
>
>
> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
> wrote:
>
>> If you’re using EMR and Spark, you need to choose nodes with enough RAM
>> to accommodate any given partition in your data or you can get an OOM
>> error. Not sure if this job involves a reduce, but I would choose a single
>> 128GB+ memory optimized instance and then adjust parallelism as via the
>> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
>> job.
>>
>> Thanks,
>> Russell Jurney @rjurney 
>> russell.jur...@gmail.com LI  FB
>>  datasyndrome.com
>>
>>
>> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>>
>>> Hi Team,
>>>
>>> I want to extract the data from DB and just dump it into S3. I
>>> don't have to perform any transformations on the data yet. My data size
>>> would be ~100 GB (historical load).
>>>
>>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>>> should I move to EMR.
>>>
>>> I don't feel the need to move to EMR but wanted the expertise
>>> suggestions.
>>>
>>> TIA.
>>>
>>


Re: OOM concern

2024-05-27 Thread Perez
Thank you everyone for your response.

I am not getting any errors as of now. I am just trying to choose the right
tool for my task which is data loading from an external source into s3 via
Glue/EMR.

I think Glue job would be the best fit for me because I can calculate DPUs
needed (maybe keeping some extra buffer) so just wanted to check if there
are any edge cases I need to consider.


On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough RAM to
> accommodate any given partition in your data or you can get an OOM error.
> Not sure if this job involves a reduce, but I would choose a single 128GB+
> memory optimized instance and then adjust parallelism as via the Dpark docs
> using pyspark.sql.DataFrame.repartition(n) at the start of your job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>> should I move to EMR.
>>
>> I don't feel the need to move to EMR but wanted the expertise suggestions.
>>
>> TIA.
>>
>


Re: Spark Protobuf Deserialization

2024-05-27 Thread Sandish Kumar HN
Did you try using to_protobuf and from_protobuf ?

https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html


On Mon, May 27, 2024 at 15:45 Satyam Raj  wrote:

> Hello guys,
> We're using Spark 3.5.0 for processing Kafka source that contains protobuf
> serialized data. The format is as follows:
>
> message Request {
>   long sent_ts = 1;
>   Event[] event = 2;
> }
>
> message Event {
>  string event_name = 1;
>  bytes event_bytes = 2;
> }
>
> The event_bytes contains the data for the event_name. event_name is the
> className of the Protobuf class.
> Currently, we parse the protobuf message from the Kafka topic, and for
> every event in the array, push the event_bytes to the `event_name` topic,
> over which spark jobs run and use the same event_name protobuf class to
> deserialize the data.
>
> Is there a better way to do all this in a single job?
>


Re: OOM concern

2024-05-27 Thread Russell Jurney
If you’re using EMR and Spark, you need to choose nodes with enough RAM to
accommodate any given partition in your data or you can get an OOM error.
Not sure if this job involves a reduce, but I would choose a single 128GB+
memory optimized instance and then adjust parallelism as via the Dpark docs
using pyspark.sql.DataFrame.repartition(n) at the start of your job.

Thanks,
Russell Jurney @rjurney 
russell.jur...@gmail.com LI  FB
 datasyndrome.com


On Mon, May 27, 2024 at 9:15 AM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise suggestions.
>
> TIA.
>


Re: OOM concern

2024-05-27 Thread Meena Rajani
What exactly is the error? Is it erroring out while reading the data from
db? How are you partitioning the data?

How much memory currently do you have? What is the network time out?

Regards,
Meena


On Mon, May 27, 2024 at 4:22 PM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise suggestions.
>
> TIA.
>


Re: [Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Mich Talebzadeh
When you use applyInPandasWithState, Spark processes each input row as it
arrives, regardless of whether certain columns, such as the timestamp
column, contain NULL values. This behavior is useful where you want to
handle incomplete or missing data gracefully within your stateful
processing logic. By allowing NULL timestamps to trigger calls to the
stateful function, you can implement custom handling strategies, such as
skipping incomplete records, within your stateful function.


However, it is important to understand that this behavior also *means that
the watermark is not advanced for NULL timestamps*. The watermark is used
for event-time processing in Spark Structured Streaming, to track the
progress of event-time in your data stream and is typically based on the
timestamp column. Since NULL timestamps do not contribute to the watermark
advancement,

Regarding whether you can rely on this behavior for your production code,
it largely depends on your requirements and use case. If your application
logic is designed to handle NULL timestamps appropriately and you have
tested it to ensure it behaves as expected, then you can generally rely on
this behavior. FYI, I have not tested it myself, so I cannot provide a
definitive answer.

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 27 May 2024 at 22:04, Juan Casse  wrote:

> I am using applyInPandasWithState in PySpark 3.5.0.
>
> I noticed that records with timestamp==NULL are processed (i.e., trigger a
> call to the stateful function). And, as you would expect, does not advance
> the watermark.
>
> I am taking advantage of this in my application.
>
> My question: Is this a supported feature of Spark? Can I rely on this
> behavior for my production code?
>
> Thanks,
> Juan
>


Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Shay Elbaz
Seen this before; had a very(!) complex plan behind a DataFrame, to the point 
where any additional transformation went OOM on the driver.

A quick and ugly solution was to break the plan - convert the DataFrame to rdd 
and back to DF at certain points to make the plan shorter. This has obvious 
drawbacks, and is not recommended in general, but at least we had something 
working. The real, long-term solution was to replace the many ( > 200)  
withColumn() calls to only a few select() calls. You can easily find sources on 
the internet for why this is better. (it was on Spark 2.3, but I think the main 
principles remain). TBH, it was easier than I expected, as it mainly involved 
moving pieces of code from one place to another, and not a "real", meaningful 
refactoring.



From: Mich Talebzadeh 
Sent: Monday, May 27, 2024 15:43
Cc: user@spark.apache.org 
Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame 
Processing


This message contains hyperlinks, take precaution before opening these links.

Few ideas on top of my head for how to go about solving the problem


  1.  Try with subsets: Try reproducing the issue with smaller subsets of your 
data to pinpoint the specific operation causing the memory problems.
  2.  Explode or Flatten Nested Structures: If your DataFrame schema involves 
deep nesting, consider using techniques like explode or flattening to transform 
it into a less nested structure. This can reduce memory usage during operations 
like withColumn.
  3.  Lazy Evaluation: Use select before withColumn: this ensures lazy 
evaluation, meaning Spark only materializes the data when necessary. This can 
improve memory usage compared to directly calling withColumn on the entire 
DataFrame.
  4.  spark.sql.shuffle.partitions: Setting this configuration to a value close 
to the number of executors can improve shuffle performance and potentially 
reduce memory usage.
  5.  Spark UI Monitoring: Utilize the Spark UI to monitor memory usage 
throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan 
 wrote:
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been facing 
while processing certain large and nested DataFrames using Apache Spark. We 
have encountered a scenario where the driver runs out of memory when applying 
the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the 
same DataFrames are processed successfully in Spark 2.4.0.

Problem Summary:
For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes 
the driver to choke and run out of memory. However, the same DataFrames are 
processed successfully in Spark 2.4.0.

Heap Dump Analysis:
We performed a heap dump analysis after enabling heap dump on out-of-memory 
errors, and the analysis revealed the following significant frames and local 
variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
 (DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.

Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Mich Talebzadeh
Few ideas on top of my head for how to go about solving the problem


   1. Try with subsets: Try reproducing the issue with smaller subsets of
   your data to pinpoint the specific operation causing the memory problems.
   2. Explode or Flatten Nested Structures: If your DataFrame schema
   involves deep nesting, consider using techniques like explode or flattening
   to transform it into a less nested structure. This can reduce memory usage
   during operations like withColumn.
   3. Lazy Evaluation: Use select before withColumn: this ensures lazy
   evaluation, meaning Spark only materializes the data when necessary. This
   can improve memory usage compared to directly calling withColumn on the
   entire DataFrame.
   4. spark.sql.shuffle.partitions: Setting this configuration to a value
   close to the number of executors can improve shuffle performance and
   potentially reduce memory usage.
   5. Spark UI Monitoring: Utilize the Spark UI to monitor memory usage
   throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".



On Mon, 27 May 2024 at 12:50, Gaurav Madan
 wrote:

> Dear Community,
>
> I'm reaching out to seek your assistance with a memory issue we've been
> facing while processing certain large and nested DataFrames using Apache
> Spark. We have encountered a scenario where the driver runs out of memory
> when applying the `withColumn` method on specific DataFrames in Spark
> 3.4.1. However, the same DataFrames are processed successfully in Spark
> 2.4.0.
>
>
> *Problem Summary:*For certain DataFrames, applying the `withColumn`
> method in Spark 3.4.1 causes the driver to choke and run out of memory.
> However, the same DataFrames are processed successfully in Spark 2.4.0.
>
>
> *Heap Dump Analysis:*We performed a heap dump analysis after enabling
> heap dump on out-of-memory errors, and the analysis revealed the following
> significant frames and local variables:
>
> ```
>
> org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:4273)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:1622)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2820)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2759)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
> (DataPersistenceUtil.scala:88)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (DataPersistenceUtil.scala:19)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (BronzeStep.scala:23)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
> (MainJob.scala:78)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V
> (MainJob.sc

Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
sorry i thought i gave an explanation

The issue you are encountering with incorrect record numbers in the
"ShuffleWrite Size/Records" column in the Spark DAG UI when data is read
from cache/persist is a known limitation. This discrepancy arises due to
the way Spark handles and reports shuffle data when caching is involved.

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 26 May 2024 at 21:16, Prem Sahoo  wrote:

> Can anyone please assist me ?
>
> On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:
>
>> Does anyone have a clue ?
>>
>> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>>
>>> Hello Team,
>>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>>> can view the tasks.
>>>
>>> In each task we have a column "ShuffleWrite Size/Records " that column
>>> prints wrong data when it gets the data from cache/persist . it
>>> typically will show the wrong record number though the data size is correct
>>> for e.g  3.2G/ 7400 which is wrong .
>>>
>>> please advise.
>>>
>>


Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Just to further clarify that the Shuffle Write Size/Records column in
the Spark UI can be misleading when working with cached/persisted data
because it reflects the shuffled data size and record count, not the
entire cached/persisted data., So it is fair to say that this is a
limitation of the UI's display, not necessarily a bug in the Spark
framework itself.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".



On Sun, 26 May 2024 at 16:45, Mich Talebzadeh  wrote:
>
> Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show 
> incorrect record counts when data is retrieved from cache or persisted data. 
> This happens because the record count reflects the number of records written 
> to disk for shuffling, and not the actual number of records in the cached or 
> persisted data itself. Add to it, because of lazy evaluation:, Spark may only 
> materialize a portion of the cached or persisted data when a task needs it. 
> The "Shuffle Write Size/Records" might only reflect the materialized portion, 
> not the total number of records in the cache/persistence. While the "Shuffle 
> Write Size/Records" might be inaccurate for cached/persisted data, the 
> "Shuffle Read Size/Records" column can be more reliable. This metric shows 
> the number of records read from shuffle by the following stage, which should 
> be closer to the actual number of records processed.
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my knowledge 
> but of course cannot be guaranteed . It is essential to note that, as with 
> any advice, quote "one test result is worth one-thousand expert opinions 
> (Werner Von Braun)".
>
>
>
> On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:
>>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you can 
>> view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column 
>> prints wrong data when it gets the data from cache/persist . it typically 
>> will show the wrong record number though the data size is correct for e.g  
>> 3.2G/ 7400 which is wrong .
>>
>> please advise.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >