Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Bjørn Jørgensen
alos remove the space in rev. scode

søn. 22. okt. 2023 kl. 19:08 skrev Sadha Chilukoori :

> Hi Meena,
>
> I'm asking to clarify, are the *on *& *and* keywords optional in the join
> conditions?
>
> Please try this snippet, and see if it helps
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> on rev.sys = p.sys
> and rev.prin = p.prin
> and rev.scode= p.bcode
>
> left join item I
> on rev.sys = I.sys
> and rev.custumer_id = I.custumer_id
> and rev. scode = I.scode;
>
> Thanks,
> Sadha
>
> On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani 
> wrote:
>
>> Hello all:
>>
>> I am using spark sql to join two tables. To my surprise I am
>> getting redundant rows. What could be the cause.
>>
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>> where rev.custumer_id = '123456789'
>>
>> The first part of the code brings one row
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>>
>> The  item has two rows which have common attributes  and the* final join
>> should result in 2 rows. But I am seeing 4 rows instead.*
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>>
>>
>> Regards,
>> Meena
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Sadha Chilukoori
Hi Meena,

I'm asking to clarify, are the *on *& *and* keywords optional in the join
conditions?

Please try this snippet, and see if it helps

select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
on rev.sys = p.sys
and rev.prin = p.prin
and rev.scode= p.bcode

left join item I
on rev.sys = I.sys
and rev.custumer_id = I.custumer_id
and rev. scode = I.scode;

Thanks,
Sadha

On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Patrick Tucci
Hi Meena,

It's not impossible, but it's unlikely that there's a bug in Spark SQL
randomly duplicating rows. The most likely explanation is there are more
records in the item table that match your sys/custumer_id/scode criteria
than you expect.

In your original query, try changing select rev.* to select I.*. This will
show you the records from item that the join produces. If the first part of
the code only returns one record, I expect you will see 4 distinct records
returned here.

Thanks,

Patrick


On Sun, Oct 22, 2023 at 1:29 AM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


automatically/dinamically renew aws temporary token

2023-10-22 Thread Carlos Aguni
hi all,

i've a scenario where I need to assume a cross account role to have S3 bucket 
access.

the problem is that this role only allows for 1h time span (no negotiation).

that said.
does anyone know a way to tell spark to automatically renew the token
or to dinamically renew the token on each node?
i'm currently using spark on AWS glue.

wonder what options do I have.

regards,c.


Spark join produce duplicate rows in resultset

2023-10-21 Thread Meena Rajani
Hello all:

I am using spark sql to join two tables. To my surprise I am
getting redundant rows. What could be the cause.


select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
rev.sys = p.sys
rev.prin = p.prin
rev.scode= p.bcode

left join item I
on rev.sys = i.sys
rev.custumer_id = I.custumer_id
rev. scode = I.scode

where rev.custumer_id = '123456789'

The first part of the code brings one row

select rev.* from rev
inner join customer c
on rev.custumer_id =c.id
inner join product p
rev.sys = p.sys
rev.prin = p.prin
rev.scode= p.bcode


The  item has two rows which have common attributes  and the* final join
should result in 2 rows. But I am seeing 4 rows instead.*

left join item I
on rev.sys = i.sys
rev.custumer_id = I.custumer_id
rev. scode = I.scode



Regards,
Meena


Error when trying to get the data from Hive Materialized View

2023-10-21 Thread Siva Sankar Reddy
Hi Team ,

We are not getting any error when retrieving the data from hive table in
PYSPARK , but getting the error ( Scala.matcherror MATERIALIZED_VIEW ( of
class org.Apache.Hadoop.hive.metastore.TableType ) . Please let me know
resolution for this ?

Thanks


spark.stop() cannot stop spark connect session

2023-10-20 Thread eab...@163.com
Hi,
my code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://172.29.190.147").getOrCreate()

import pandas as pd
# 创建pandas dataframe
pdf = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"gender": ["F", "M", "M"]
})

# 将pandas dataframe转换为spark dataframe
sdf = spark.createDataFrame(pdf)

# 显示spark dataframe
sdf.show()

spark.stop()

After stop, execute sdf.show() throw 
pyspark.errors.exceptions.connect.SparkConnectException: [NO_ACTIVE_SESSION] No 
active Spark session found. Please create a new Spark session before running 
the code. Visit the Spark web UI at http://172.29.190.147:4040/connect/ to 
check if the current session is still running and has not been stopped yet.
1 session(s) are online, running 0 Request(s)
 Session Statistics (1)
1 Pages. Jump to. Showitems in a page.Go
Page:
1
User
Session ID
Start Time ▾
Finish Time
Duration
Total Execute
29f05cde-8f8b-418d-95c0-8dbbbfb556d22023/10/20 15:30:0414 minutes 49 seconds2


eabour


"Premature end of Content-Length" Error

2023-10-19 Thread Sandhya Bala
Hi all,

I am running into the following error with spark 2.4.8

Job aborted due to stage failure: Task 9 in stage 2.0 failed 4 times, most
> recent failure: Lost task 9.3 in stage 2.0 (TID 100, 10.221.8.73, executor
> 2): org.apache.http.ConnectionClosedException: Premature end of
> Content-Length delimited message body (expected: 106327; received: 3477


but my current code doesn't have neither hadoop-aws nor aws-java-sdk.

Any suggestions on what could be the problem?

Thanks,
Sandhya


Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-19 Thread eab...@163.com
Hi,
I have found three important classes:
org.apache.spark.sql.connect.service.SparkConnectServer : the 
./sbin/start-connect-server.sh script use SparkConnectServer  class as main 
class. In main function, use SparkSession.builder.getOrCreate() create local 
sessin, and start SparkConnectService.
org.apache.spark.sql.connect.SparkConnectPlugin : To enable Spark Connect, 
simply make sure that the appropriate JAR is available in the CLASSPATH and the 
driver plugin is configured to load this class.
org.apache.spark.sql.connect.SimpleSparkConnectService : A simple main class 
method to start the spark connect server as a service for client tests. 

   So, I believe that by configuring the spark.plugins and starting the Spark 
cluster on Kubernetes, clients can utilize sc://ip:port to connect to the 
remote server. 
   Let me give it a try.



eabour
 
From: eab...@163.com
Date: 2023-10-19 14:28
To: Nagatomi Yasukazu; user @spark
Subject: Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Hi all, 
Has the spark connect server running on k8s functionality been implemented?



 
From: Nagatomi Yasukazu
Date: 2023-09-05 17:51
To: user
Subject: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Dear Spark Community,

I've been exploring the capabilities of the Spark Connect Server and 
encountered an issue when trying to launch it in a cluster deploy mode with 
Kubernetes as the master.

While initiating the `start-connect-server.sh` script with the `--conf` 
parameter for `spark.master` and `spark.submit.deployMode`, I was met with an 
error message:

```
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode 
is not applicable to Spark Connect server.
```

This error message can be traced back to Spark's source code here:
https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307

Given my observations, I'm curious about the Spark Connect Server roadmap:

Is there a plan or current conversation to enable Kubernetes as a master in 
Spark Connect Server's cluster deploy mode?

I have tried to gather information from existing JIRA tickets, but have not 
been able to get a definitive answer:

https://issues.apache.org/jira/browse/SPARK-42730
https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-44117

Any thoughts, updates, or references to similar conversations or initiatives 
would be greatly appreciated.

Thank you for your time and expertise!

Best regards,
Yasukazu

2023年9月5日(火) 12:09 Nagatomi Yasukazu :
Hello Mich,
Thank you for your questions. Here are my responses:

> 1. What investigation have you done to show that it is running in local mode?

I have verified through the History Server's Environment tab that:
- "spark.master" is set to local[*]
- "spark.app.id" begins with local-xxx
- "spark.submit.deployMode" is set to local


> 2. who has configured this kubernetes cluster? Is it supplied by a cloud 
> vendor?

Our Kubernetes cluster was set up in an on-prem environment using RKE2( 
https://docs.rke2.io/ ).


> 3. Confirm that you have configured Spark Connect Server correctly for 
> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes) 
> and other relevant Spark configurations in your Spark job submission.

Based on the Spark Connect documentation I've read, there doesn't seem to be 
any specific settings for cluster mode related to the Spark Connect Server.

Configuration - Spark 3.4.1 Documentation
https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect

Quickstart: Spark Connect — PySpark 3.4.1 documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

Spark Connect Overview - Spark 3.4.1 Documentation
https://spark.apache.org/docs/latest/spark-connect-overview.html

The documentation only suggests running ./sbin/start-connect-server.sh 
--packages org.apache.spark:spark-connect_2.12:3.4.0, leaving me at a loss.


> 4. Can you provide a full spark submit command

Given the nature of Spark Connect, I don't use the spark-submit command. 
Instead, as per the documentation, I can execute workloads using only a Python 
script. For the Spark Connect Server, I have a Kubernetes manifest executing 
"/opt.spark/sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0".


> 5. Make sure that the Python client script connecting to Spark Connect Server 
> specifies the cluster mode explicitly, like using --master or --deploy-mode 
> flags when creating a SparkSession.

The Spark Connect Server operates as a Driver, so it isn't possible to specify 
the --master or --deploy-mode flags in the Python client script. If I try, I 
encounter a RuntimeError.

like this:
RuntimeError: Spark master cannot be configured with Spark Connect server; 
however, found URL for Spark Connect [sc://.../]


> 6. Ensure that you have allocated 

Re: Re: Running Spark Connect Server in Cluster Mode on Kubernetes

2023-10-19 Thread eab...@163.com
Hi all, 
Has the spark connect server running on k8s functionality been implemented?



 
From: Nagatomi Yasukazu
Date: 2023-09-05 17:51
To: user
Subject: Re: Running Spark Connect Server in Cluster Mode on Kubernetes
Dear Spark Community,

I've been exploring the capabilities of the Spark Connect Server and 
encountered an issue when trying to launch it in a cluster deploy mode with 
Kubernetes as the master.

While initiating the `start-connect-server.sh` script with the `--conf` 
parameter for `spark.master` and `spark.submit.deployMode`, I was met with an 
error message:

```
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode 
is not applicable to Spark Connect server.
```

This error message can be traced back to Spark's source code here:
https://github.com/apache/spark/blob/6c885a7cf57df328b03308cff2eed814bda156e4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L307

Given my observations, I'm curious about the Spark Connect Server roadmap:

Is there a plan or current conversation to enable Kubernetes as a master in 
Spark Connect Server's cluster deploy mode?

I have tried to gather information from existing JIRA tickets, but have not 
been able to get a definitive answer:

https://issues.apache.org/jira/browse/SPARK-42730
https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-44117

Any thoughts, updates, or references to similar conversations or initiatives 
would be greatly appreciated.

Thank you for your time and expertise!

Best regards,
Yasukazu

2023年9月5日(火) 12:09 Nagatomi Yasukazu :
Hello Mich,
Thank you for your questions. Here are my responses:

> 1. What investigation have you done to show that it is running in local mode?

I have verified through the History Server's Environment tab that:
- "spark.master" is set to local[*]
- "spark.app.id" begins with local-xxx
- "spark.submit.deployMode" is set to local


> 2. who has configured this kubernetes cluster? Is it supplied by a cloud 
> vendor?

Our Kubernetes cluster was set up in an on-prem environment using RKE2( 
https://docs.rke2.io/ ).


> 3. Confirm that you have configured Spark Connect Server correctly for 
> cluster mode. Make sure you specify the cluster manager (e.g., Kubernetes) 
> and other relevant Spark configurations in your Spark job submission.

Based on the Spark Connect documentation I've read, there doesn't seem to be 
any specific settings for cluster mode related to the Spark Connect Server.

Configuration - Spark 3.4.1 Documentation
https://spark.apache.org/docs/3.4.1/configuration.html#spark-connect

Quickstart: Spark Connect — PySpark 3.4.1 documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

Spark Connect Overview - Spark 3.4.1 Documentation
https://spark.apache.org/docs/latest/spark-connect-overview.html

The documentation only suggests running ./sbin/start-connect-server.sh 
--packages org.apache.spark:spark-connect_2.12:3.4.0, leaving me at a loss.


> 4. Can you provide a full spark submit command

Given the nature of Spark Connect, I don't use the spark-submit command. 
Instead, as per the documentation, I can execute workloads using only a Python 
script. For the Spark Connect Server, I have a Kubernetes manifest executing 
"/opt.spark/sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0".


> 5. Make sure that the Python client script connecting to Spark Connect Server 
> specifies the cluster mode explicitly, like using --master or --deploy-mode 
> flags when creating a SparkSession.

The Spark Connect Server operates as a Driver, so it isn't possible to specify 
the --master or --deploy-mode flags in the Python client script. If I try, I 
encounter a RuntimeError.

like this:
RuntimeError: Spark master cannot be configured with Spark Connect server; 
however, found URL for Spark Connect [sc://.../]


> 6. Ensure that you have allocated the necessary resources (CPU, memory etc) 
> to Spark Connect Server when running it on Kubernetes.

Resources are ample, so that shouldn't be the problem.


> 7. Review the environment variables and configurations you have set, 
> including the SPARK_NO_DAEMONIZE=1 variable. Ensure that these variables are 
> not conflicting with 

I'm unsure if SPARK_NO_DAEMONIZE=1 conflicts with cluster mode settings. But 
without it, the process goes to the background when executing 
start-connect-server.sh, causing the Pod to terminate prematurely.


> 8. Are you using the correct spark client version that is fully compatible 
> with your spark on the server?

Yes, I have verified that without using Spark Connect (e.g., using Spark 
Operator), Spark applications run as expected.

> 9. check the kubernetes error logs

The Kubernetes logs don't show any errors, and jobs are running in local mode.


> 10. Insufficient resources can lead to the application running in local mode

I wasn't aware that insufficient resources 

Re: hive: spark as execution engine. class not found problem

2023-10-17 Thread Vijay Shankar
UNSUBSCRIBE

On Tue, Oct 17, 2023 at 5:09 PM Amirhossein Kabiri <
amirhosseikab...@gmail.com> wrote:

> I used Ambari to config and install Hive and Spark. I want to insert into
> a hive table using Spark execution Engine but I face to this weird error.
> The error is:
>
> Job failed with java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> 2023-10-17 10:07:42,972 ERROR [c4aeb932-743e-4736-b00f-6b905381fa03 main]
> status.SparkJobMonitor: Job failed with java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> Serialization trace:
> invertedWorkGraph (org.apache.hadoop.hive.ql.plan.SparkWork)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at
> org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
> at
> org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:206)
> at
> org.apache.hadoop.hive.ql.exec.spark.KryoSerializer.deserialize(KryoSerializer.java:60)
> at
> org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:329)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:378)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
> ... 15 more
>
> 2023-10-17 10:07:43,067 INFO  [c4aeb932-743e-4736-b00f-6b905381fa03 main]
> reexec.ReOptimizePlugin: ReOptimization: retryPossible: false
> FAILED: Execution Error, return code 3 from
> org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed during
> runtime. Please check stacktrace for the root cause.
>
> the weird part is Hive make this itself and asks me where to find it! I
> would appreciate any helps to solve and locate the problem.
>
> note: The Ambari, Hadoop, Hive, Zookeeper and Spark Works Well according
> to the Ambari service health check.
> note: Since I didnt find any spark specific hive-site.xml I added the
> following configs to the hive-site.xml file:
> 
>   hive.execution.engine
>   spark
> 
>
> 
>   hive.spark.warehouse.location
>   /tmp/spark/warehouse
> 
>
> 
>   hive.spark.sql.execution.mode
>   adaptive
> 
>
> 
>   hive.spark.sql.shuffle.partitions
>   200
> 
>
> 
>   hive.spark.sql.shuffle.partitions.pernode
>   2
> 
>
> 
>   hive.spark.sql.memory.fraction
>   0.6
> 
>
> 
>   hive.spark.sql.codegen.enabled
>   true
> 
>
> 
>   spark.sql.hive.hiveserver2.jdbc.url
>   jdbc:hive2://my.ambari.com:2181
> /;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
> 
>
> 
>   spark.datasource.hive.warehouse.load.staging.dir
>   /tmp
> 
>
>
> 
>   spark.hadoop.hive.zookeeper.quorum
>   my.ambari.com:2181
> 
>
> 
>
> spark.datasource.hive.warehouse.write.path.strictColumnNamesMapping
>   true
> 
>
> 
>   spark.sql.hive.conf.list
>
> hive.vectorized.execution.filesink.arrow.native.enabled=true;hive.vectorized.execution.enabled=true
> 
>
> 
>   hive.spark.client.connect.timeout
>   3ms
> 
>
> 
>   hive.spark.client.server.connect.timeout
>   30ms
>
> 
> hive.hook.proto.base-directory
> /tmp/hive/hooks
>   
>   
> hive.spark.sql.shuffle.partitions
> 200
>   
>   
> hive.strict.managed.tables
> true
>   
>   
> hive.stats.fetch.partition.stats
> true
>   
>   
> hive.spark.sql.memory.fraction
> 0.6
>   
>   
> hive.spark.sql.execution.mode
> spark
>   
>   
> hive.spark.sql.codegen.enabled
> 

hive: spark as execution engine. class not found problem

2023-10-17 Thread Amirhossein Kabiri
I used Ambari to config and install Hive and Spark. I want to insert into a
hive table using Spark execution Engine but I face to this weird error. The
error is:

Job failed with java.lang.ClassNotFoundException:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
2023-10-17 10:07:42,972 ERROR [c4aeb932-743e-4736-b00f-6b905381fa03 main]
status.SparkJobMonitor: Job failed with java.lang.ClassNotFoundException:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
com.esotericsoftware.kryo.KryoException: Unable to find class:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
Serialization trace:
invertedWorkGraph (org.apache.hadoop.hive.ql.plan.SparkWork)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at
org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at
org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readObject(SerializationUtilities.java:206)
at
org.apache.hadoop.hive.ql.exec.spark.KryoSerializer.deserialize(KryoSerializer.java:60)
at
org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:329)
at
org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:378)
at
org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
ive_20231017100559_301568f9-bdfa-4f7c-89a6-f69a65b30aaf:1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 15 more

2023-10-17 10:07:43,067 INFO  [c4aeb932-743e-4736-b00f-6b905381fa03 main]
reexec.ReOptimizePlugin: ReOptimization: retryPossible: false
FAILED: Execution Error, return code 3 from
org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed during
runtime. Please check stacktrace for the root cause.

the weird part is Hive make this itself and asks me where to find it! I
would appreciate any helps to solve and locate the problem.

note: The Ambari, Hadoop, Hive, Zookeeper and Spark Works Well according to
the Ambari service health check.
note: Since I didnt find any spark specific hive-site.xml I added the
following configs to the hive-site.xml file:

  hive.execution.engine
  spark



  hive.spark.warehouse.location
  /tmp/spark/warehouse



  hive.spark.sql.execution.mode
  adaptive



  hive.spark.sql.shuffle.partitions
  200



  hive.spark.sql.shuffle.partitions.pernode
  2



  hive.spark.sql.memory.fraction
  0.6



  hive.spark.sql.codegen.enabled
  true



  spark.sql.hive.hiveserver2.jdbc.url
  jdbc:hive2://my.ambari.com:2181
/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2



  spark.datasource.hive.warehouse.load.staging.dir
  /tmp




  spark.hadoop.hive.zookeeper.quorum
  my.ambari.com:2181




spark.datasource.hive.warehouse.write.path.strictColumnNamesMapping
  true



  spark.sql.hive.conf.list

hive.vectorized.execution.filesink.arrow.native.enabled=true;hive.vectorized.execution.enabled=true



  hive.spark.client.connect.timeout
  3ms



  hive.spark.client.server.connect.timeout
  30ms


hive.hook.proto.base-directory
/tmp/hive/hooks
  
  
hive.spark.sql.shuffle.partitions
200
  
  
hive.strict.managed.tables
true
  
  
hive.stats.fetch.partition.stats
true
  
  
hive.spark.sql.memory.fraction
0.6
  
  
hive.spark.sql.execution.mode
spark
  
  
hive.spark.sql.codegen.enabled
true
  
  
hive.heapsize
2g
  
  
hive.spark.sql.shuffle.partitions.pernode
100
  
  
hive.spark.warehouse.location
/user/hive/warehouse
  



Re: Spark stand-alone mode

2023-10-17 Thread Ilango
Hi all,

Thanks a lot for your suggestions and knowledge sharing. I like to let you
know that, I completed setting up the stand alone cluster and couple of
data science users are able to use it already for last two weeks. And the
performance is really good. Almost 10X performance improvement compare to
HPC local mode. They tested with some complex data science scripts using
spark and other data science projects. The cluster is really stable and
very performant.

I enabled dynamic allocation and cap the memory and cpu accordingly at
spark-defaults. Conf and at our spark framework code. So its been pretty
impressive for the last few weeks.

Thanks you so much!

Thanks,
Elango


On Tue, 19 Sep 2023 at 6:40 PM, Patrick Tucci 
wrote:

> Multiple applications can run at once, but you need to either configure
> Spark or your applications to allow that. In stand-alone mode, each
> application attempts to take all resources available by default. This
> section of the documentation has more details:
>
>
> https://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling
>
> Explicitly setting the resources per application limits the resources to
> the configured values for the lifetime of the application. You can use
> dynamic allocation to allow Spark to scale the resources up and down per
> application based on load, but the configuration is relatively more complex:
>
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> On Mon, Sep 18, 2023 at 3:53 PM Ilango  wrote:
>
>>
>> Thanks all for your suggestions. Noted with thanks.
>> Just wanted share few more details about the environment
>> 1. We use NFS for data storage and data is in parquet format
>> 2. All HPC nodes are connected and already work as a cluster for Studio
>> workbench. I can setup password less SSH if it not exist already.
>> 3. We will stick with NFS for now and stand alone then may be will
>> explore HDFS and YARN.
>>
>> Can you please confirm whether multiple users can run spark jobs at the
>> same time?
>> If so I will start working on it and let you know how it goes
>>
>> Mich, the link to Hadoop is not working. Can you please check and let me
>> know the correct link. Would like to explore Hadoop option as well.
>>
>>
>>
>> Thanks,
>> Elango
>>
>> On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
>> wrote:
>>
>>> you need to setup ssh without password, use key instead.  How to
>>> connect without password using SSH (passwordless)
>>> 
>>>
>>> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
 Hi,

 Can these 4 nodes talk to each other through ssh as trusted hosts (on
 top of the network that Sean already mentioned)? Otherwise you need to set
 it up. You can install a LAN if you have another free port at the back of
 your HPC nodes. They should

 You ought to try to set up a Hadoop cluster pretty easily. Check this
 old article of mine for Hadoop set-up.


 https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D

 Hadoop will provide you with a common storage layer (HDFS) that these
 nodes will be able to share and talk. Yarn is your best bet as the resource
 manager with reasonably powerful hosts you have. However, for now the Stand
 Alone mode will do. Make sure that the Metastore you choose, (by default it
 will use Hive Metastore called Derby :( ) is something respetable like
 Postgres DB that can handle multiple concurrent spark jobs

 HTH


 Mich Talebzadeh,
 Distinguished Technologist, Solutions Architect & Engineer
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:

>
> Hi all,
>
> We have 4 HPC nodes and installed spark individually in all nodes.
>
> Spark is used as local mode(each driver/executor will have 8 cores and
> 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
> scheduler.
>
> As this is local mode, we are facing performance issue(as only one
> executor) when it comes dealing with large datasets.
>
> Can I convert this 4 nodes into spark standalone cluster. We dont have
> hadoop so yarn mode is out of 

Re: Can not complete the read csv task

2023-10-14 Thread Khalid Mammadov
This command only defines a new DataFrame, in order to see some results you
need to do something like merged_spark_data.show() on a new line.

Regarding the error I think it's typical error that you get when you run
Spark on Windows OS. You can suppress it using Winutils tool (Google it or
ChatGPT it to see how).

On Thu, 12 Oct 2023, 11:58 Kelum Perera,  wrote:

> Dear friends,
>
> I'm trying to get a fresh start with Spark. I tried to read few CSV files
> in a folder, but the task got stuck and not completed as shown in the
> copied content from the terminal.
>
> Can someone help to understand what is going wrong?
>
> Versions;
> java version "11.0.16" 2022-07-19 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed
> mode)
>
> Python 3.9.13
> Windows 10
>
> Copied from the terminal;
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.5.0
>   /_/
>
> Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
> Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
> Spark context available as 'sc' (master = local[*], app id =
> local-1697089858181).
> SparkSession available as 'spark'.
> >>> merged_spark_data =
> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
> header=False )
> Exception in thread "globPath-ForkJoinPool-1-worker-115"
> java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
> at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native
> Method)
> at
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
> at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
> at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
> at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
> at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
> at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
> at
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
> at
> org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> at
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>
>
>
> Noting happens afterwards. Appreciate your kind input to solve this.
>
> Best Regards,
> Kelum Perera
>
>
>
>


[ANNOUNCE] Apache Celeborn(incubating) 0.3.1 available

2023-10-13 Thread Cheng Pan
Hi all,

Apache Celeborn(Incubating) community is glad to announce the
new release of Apache Celeborn(Incubating) 0.3.1.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.

Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:
- https://github.com/apache/incubator-celeborn/releases/tag/v0.3.1-incubating

Release Notes:
- https://celeborn.apache.org/community/release_notes/release_note_0.3.1

Home Page: https://celeborn.apache.org/

Celeborn Resources:
- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Thanks,
Cheng Pan
On behalf of the Apache Celeborn(incubating) community




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Fw: Can not complete the read csv task

2023-10-13 Thread KP Youtuber
Dear group members,

I'm trying to get a fresh start with Spark, but came a cross following
issue;

I tried to read few CSV files from a folder, but the task got stuck and
didn't complete. ( copied content from the terminal.)

Can someone help to understand what is going wrong?

Versions;
java version "11.0.16" 2022-07-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed
mode)

Python 3.9.13
Windows 10

Copied from the terminal;
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
  /_/

Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
Spark context available as 'sc' (master = local[*], app id =
local-1697089858181).
SparkSession available as 'spark'.
>>> merged_spark_data =
spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
header=False )
Exception in thread "globPath-ForkJoinPool-1-worker-115"
java.lang.UnsatisfiedLinkError:
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native
Method)
at
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
at
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)



Noting happens afterwards. Appreciate your kind input to solve this.

Best Regards,
Kelum Perera


Fw: Can not complete the read csv task

2023-10-13 Thread Kelum Perera


From: Kelum Perera 
Sent: Thursday, October 12, 2023 11:40 AM
To: user@spark.apache.org ; Kelum Perera 
; Kelum Gmail 
Subject: Can not complete the read csv task

Dear friends,

I'm trying to get a fresh start with Spark. I tried to read few CSV files in a 
folder, but the task got stuck and not completed as shown in the copied content 
from the terminal.

Can someone help to understand what is going wrong?

Versions;
java version "11.0.16" 2022-07-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed mode)

Python 3.9.13
Windows 10

Copied from the terminal;
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
  /_/

Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1697089858181).
SparkSession available as 'spark'.
>>> merged_spark_data = 
>>> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
>>>  header=False )
Exception in thread "globPath-ForkJoinPool-1-worker-115" 
java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at 
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
at 
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)



Noting happens afterwards. Appreciate your kind input to solve this.

Best Regards,
Kelum Perera





Re: [ SPARK SQL ]: UPPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-10-13 Thread Suyash Ajmera
This issue is related to CharVarcharCodegenUtils readSidePadding method .

Appending white spaces while reading ENUM data from mysql

Causing issue in querying , writing the same data to Cassandra.

On Thu, 12 Oct, 2023, 7:46 pm Suyash Ajmera, 
wrote:

> I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am
> querying to Mysql Database and applying
>
> `*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working
> as expected in spark 3.3.1 , but not working with 3.5.0.
>
> Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
> upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`
>
> The *st *column is ENUM in the database and it is causing the issue.
>
> Below is the Physical Plan of *FILTER* phase :
>
> For 3.3.1 :
>
> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
> (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))
>
> For 3.5.0 :
>
> +- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
> (upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
> (upper(staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true)) = CLOSED)))
>
> -
>
> I have debug it and found that Spark added a property in version 3.4.0 ,
> i.e. **spark.sql.readSideCharPadding** which has default value **true**.
>
> Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697
>
> Added a new method in Class **CharVarcharCodegenUtils**
>
> public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
> int numChars = inputStr.numChars();
> if (numChars == limit) {
>   return inputStr;
> } else if (numChars < limit) {
>   return inputStr.rpad(limit, SPACE);
> } else {
>   return inputStr;
> }
>   }
>
>
> **This method is appending some whitespace padding to the ENUM values
> while reading and causing the Issue.**
>
> ---
>
> When I am removing the UPPER function from the where condition the
> **FILTER** Phase looks like this :
>
>  +- Filter (((staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
>  StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
> ) OR (staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
> (staticinvoke(class
> org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
> readSidePadding, st#42, 13, true, false, true) = CLOSED   ))
>
>
> **You can see it has added some white space after the value and the query
> runs fine giving the correct result.**
>
> But with the UPPER function I am not getting the data.
>
> --
>
> I have also tried to disable this Property *spark.sql.readSideCharPadding
> = false* with following cases :
>
> 1. With Upper function in where clause :
>It is not pushing the filters to Database and the *query works fine*.
>
>   +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
> (upper(st#42) = CLOSED))
>
> 2. But when I am removing the upper function
>
>  *It is pushing the filter to Mysql with the white spaces and I am not
> getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*
>
>   PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
> *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
> )),EqualTo(st,CLOSED   ))]
>
> I cannot move this filter to JDBC read query , also I can't remove this
> UPPER function in the where clause.
>
>
> 
>
> Also I found same data getting written to CASSANDRA with *PADDING .*
>


[ SPARK SQL ]: PPER in WHERE condition is not working in Apache Spark 3.5.0 for Mysql ENUM Column

2023-10-12 Thread Suyash Ajmera
I have upgraded my spark job from spark 3.3.1 to spark 3.5.0, I am querying
to Mysql Database and applying

`*UPPER(col) = UPPER(value)*` in the subsequent sql query. It is working as
expected in spark 3.3.1 , but not working with 3.5.0.

Where Condition ::  `*UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR
upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')*`

The *st *column is ENUM in the database and it is causing the issue.

Below is the Physical Plan of *FILTER* phase :

For 3.3.1 :

+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR
(upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))

For 3.5.0 :

+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = OPEN) OR
(upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR
(upper(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true)) = CLOSED)))

-

I have debug it and found that Spark added a property in version 3.4.0 ,
i.e. **spark.sql.readSideCharPadding** which has default value **true**.

Link to the JIRA : https://issues.apache.org/jira/browse/SPARK-40697

Added a new method in Class **CharVarcharCodegenUtils**

public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
int numChars = inputStr.numChars();
if (numChars == limit) {
  return inputStr;
} else if (numChars < limit) {
  return inputStr.rpad(limit, SPACE);
} else {
  return inputStr;
}
  }


**This method is appending some whitespace padding to the ENUM values while
reading and causing the Issue.**

---

When I am removing the UPPER function from the where condition the
**FILTER** Phase looks like this :

 +- Filter (((staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
 StringType, readSidePadding, st#42, 13, true, false, true) = OPEN
) OR (staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true) = REOPEN   )) OR
(staticinvoke(class
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType,
readSidePadding, st#42, 13, true, false, true) = CLOSED   ))


**You can see it has added some white space after the value and the query
runs fine giving the correct result.**

But with the UPPER function I am not getting the data.

--

I have also tried to disable this Property *spark.sql.readSideCharPadding =
false* with following cases :

1. With Upper function in where clause :
   It is not pushing the filters to Database and the *query works fine*.

  +- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR
(upper(st#42) = CLOSED))

2. But when I am removing the upper function

 *It is pushing the filter to Mysql with the white spaces and I am not
getting the data. (THIS IS A CAUSING VERY BIG ISSUE)*

  PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON),
*Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN
)),EqualTo(st,CLOSED   ))]

I cannot move this filter to JDBC read query , also I can't remove this
UPPER function in the where clause.



Also I found same data getting written to CASSANDRA with *PADDING .*


Can not complete the read csv task

2023-10-12 Thread Kelum Perera
Dear friends,

I'm trying to get a fresh start with Spark. I tried to read few CSV files in a 
folder, but the task got stuck and not completed as shown in the copied content 
from the terminal.

Can someone help to understand what is going wrong?

Versions;
java version "11.0.16" 2022-07-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed mode)

Python 3.9.13
Windows 10

Copied from the terminal;
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
  /_/

Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1697089858181).
SparkSession available as 'spark'.
>>> merged_spark_data = 
>>> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
>>>  header=False )
Exception in thread "globPath-ForkJoinPool-1-worker-115" 
java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at 
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
at 
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
at 
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)



Noting happens afterwards. Appreciate your kind input to solve this.

Best Regards,
Kelum Perera





Re: Autoscaling in Spark

2023-10-10 Thread Mich Talebzadeh
This has been brought up a few times. I will focus on Spark Structured
Streaming

Autoscaling does not support Spark Structured Streaming (SSS). Why because
streaming jobs are typically long-running jobs that need to maintain state
across micro-batches. Autoscaling is designed to scale up and down Spark
clusters in response to workload changes However, this would cause problems
for Spark Structured Streaming jobs because it would cause the jobs to lose
their state. These jobs continuously process incoming data and update their
state incrementally (see checkpoint directory). Autoscaling, which can
dynamically add or remove worker nodes, would disrupt this stateful
processing. Although Spark itself supports dynamic allocation, (i.e. which
can add or remove executor nodes based on demand), it is not the same as
autoscaling in cloud  like GCP etc like Kubernetes or managed clusters. For
now you need to plan your workload in SSS accordingly.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink  has an issue absorbing data in a timely manner as per above
formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka or other source. We can start by assuming that the rate of
increase in the number of messages processed (processing time) will require
an *additional reserved capacity*. We can anticipate a heuristic 70% (~1SD)
increase in the processing time so in theory you  should be able to handle
all this work below the batch interval.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 10 Oct 2023 at 16:11, Kiran Biswal  wrote:

> Hello Experts
>
> Is there any true auto scaling option for spark? The dynamic auto scaling
> works only for batch. Any guidelines on spark streaming  autoscaling and
> how that will be tied to any cluster level autoscaling solutions?
>
> Thanks
>


Autoscaling in Spark

2023-10-10 Thread Kiran Biswal
Hello Experts

Is there any true auto scaling option for spark? The dynamic auto scaling
works only for batch. Any guidelines on spark streaming  autoscaling and
how that will be tied to any cluster level autoscaling solutions?

Thanks


Re: Updating delta file column data

2023-10-10 Thread Mich Talebzadeh
Hi,

Since you mentioned that  there could be duplicate records with the same
unique key in the Delta table, you will need a way to handle these
duplicate records. One approach I can suggest is to use a timestamp to
determine the latest or most relevant record among duplicates, the
so-called op_time column df = df.withColumn("op_time", current_timestamp())
at ingestion time, so you can determine the most relevant record etc

This is the pseudo-code suggestion

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct
appName = "DeltaHexToIntConversion"
spark = SparkSession.builder.appName(appName).getOrCreate()
delta_table_path = "path_to_your_delta_table"
df = spark.read.format("delta").load(delta_table_path)
df = df.withColumn(
"exploded_data",
struct(col("data.field1").cast("int").alias("field1_int"),
col("data.field2"))
)
df = df.select("other_columns", "exploded_data.field1_int",
"exploded_data.field2")
# Handling Duplicates:
# Define your logic here to select the most relevant record among
duplicates, say timestamp as mentioned above
df = df.dropDuplicates(["unique_key"], keep="last")
# merge the DataFrame back to the Delta table
df.write.format("delta").mode("mergr").option("mergeSchema",
"true").save(delta_table_path)


HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:12, Mich Talebzadeh 
wrote:

> In a nutshell, is this what you are trying to do?
>
>
>1. Read the Delta table into a Spark DataFrame.
>2. Explode the string column into a struct column.
>3. Convert the hexadecimal field to an integer.
>4. Write the DataFrame back to the Delta table in merge mode with a
>unique key.
>
> Is this a fair assessment
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:
>
>> Hi All,
>>
>> I have  mentioned the sample data below and the operation I need to
>> perform over there,
>>
>> I have delta tables with columns, in that columns I have the data in the
>> string data type(contains the struct data),
>>
>> So, I need to update one key value in the struct field data in the string
>> column of the delta table.
>>
>> Note: I can able to explode the string column into the struct field and
>> into the individual field by using the following operation in the spark,
>>
>> [image: image.png]
>>
>> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>>
>> Could you suggest a possible way to perform the required action in an
>> optimistic way?
>>
>> Note: Please feel free to ask, if you need further information.
>>
>> Thanks & regards,
>> Karthick
>>
>> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk 
>> wrote:
>>
>>> Hi community members,
>>>
>>> In databricks adls2 delta tables, I need to perform the below operation,
>>> could you help me with your thoughts
>>>
>>>  I have the delta tables with one colum with data type string , which
>>> contains the json data in string data type, I need to do the following
>>> 1. I have to update one particular field value in the json and update it
>>> back in the same column of the data.
>>>
>>> Example :
>>>
>>> In string column, inside json I have one field with value in hexadecimal.
>>> Like { version : ''0xabcd1234"}
>>>
>>> I have to convert this field into corresponding integer value and update
>>> back into same strong column json value.
>>> Note: I have to perform this operation within this column. This column
>>> is basically with data type string in delta table.
>>>
>>> Could you suggest some sample example.
>>>
>>> Thanks in advance.
>>>
>>


Re: Log file location in Spark on K8s

2023-10-09 Thread Prashant Sharma
Hi Sanket,

Driver and executor logs are written to stdout by default, it can be
configured using SPARK_HOME/conf/log4j.properties file. The file including
the entire SPARK_HOME/conf is auto propogateded to all driver and executor
container and mounted as volume.

Thanks

On Mon, 9 Oct, 2023, 5:37 pm Agrawal, Sanket,
 wrote:

> Hi All,
>
>
>
> We are trying to send the spark logs using fluent-bit. We validated that
> fluent-bit is able to move logs of all other pods except the
> driver/executor pods.
>
>
>
> It would be great if someone can guide us where should I look for spark
> logs in Spark on Kubernetes with client/cluster mode deployment.
>
>
>
> Thanks,
> Sanket A.
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>


Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Danilo Sousa
Unsubscribe

> Em 9 de out. de 2023, à(s) 07:03, Mich Talebzadeh  
> escreveu:
> 
> Hi,
> 
> Please see my responses below:
> 
> 1) In Spark Structured Streaming does commit mean streaming data has been 
> delivered to the sink like Snowflake?
> 
> No. a commit does not refer to data being delivered to a sink like Snowflake 
> or bigQuery. The term commit refers to Spark Structured Streaming (SS) 
> internals. Specifically it means that a micro-batch of data has been 
> processed by SSS. In the checkpoint directory there is a subdirectory called 
> commits that marks the micro-batch process as completed.
> 
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
> timely manner, will there be an impact on spark streaming itself?
> 
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a 
> timely manner, the batches will start to back up in SSS. This can cause Spark 
> to run out of memory and the streaming job to fail. As I understand, Spark 
> will use a combination of memory and disk storage (checkpointing). This can 
> also happen if the network interface between Spark and the sink is disrupted. 
> On the other hand Spark may slow down, as it tries to process the backed-up 
> batches of data. You want to avoid these scenarios.
> 
> HTH
> 
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
> 
>view my Linkedin profile 
> 
> 
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
>  wrote:
>> Hello team
>> 
>> 1) In Spark Structured Streaming does commit mean streaming data has been 
>> delivered to the sink like Snowflake?
>> 
>> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a 
>> timely manner, will there be an impact on spark streaming itself?
>> 
>> Thanks
>> 
>> AK



Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Mich Talebzadeh
Your mileage varies. Often there is a flavour of Cloud Data warehouse
already there. CDWs like BigQuery, Redshift, Snowflake and so forth. They
can all do a good job for various degrees

   - Use efficient data types. Choose data types that are efficient for
   Spark to process. For example, use integer data types for columns that
   store integer values.
   - Avoid using complex data types. Complex data types, such as nested
   structs and arrays, can be less efficient for Spark to process.
   - Opt for columnar storage format like Parquet for your sink table.
   Columnar storage is highly efficient for analytical workloads as it allows
   for column-level compression and predicate pushdown.
   - These CDW come with partitioning options. Popular are date or time
   formats that can be used for partitioning. This will reduce the amount of
   data scanned during queries.
   - Some of these CDWs come with native streaming capabilities like
   BigQuery Streaming, I believe Snowflake has Snowpipe Streaming API as well
   (don't know much about it) . These options  enable real-time data ingestion
   and processing, No need for manual batch processing etc.
   - You can batch incoming data for efficiency processing, which can
   improve performance and simplify data handling. Start by configuring your
   Spark Streaming context with an appropriate batch interval. The batch
   interval defines how often Spark will process a batch of data. Choose a
   batch interval that balances latency and throughput based on the
   application's needs. Spark can process batches of data more efficiently
   than it can process individual records.
   - Snowflake says it is serverless and so is BigQuery. They are designed
   to provide a uniform performance regardless of workload. Serverless CDWs
   can efficiently handle both batch and streaming workloads without the need
   for manual resource provisioning.
   - Use materialized views to pre-compute query results, which can improve
   the performance of frequently executed queries. This has been around from
   classics RDBMs

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:50, ashok34...@yahoo.com 
wrote:

> Thank you for your feedback Mich.
>
> In general how can one optimise the cloud data warehouses (the sink part),
> to handle streaming Spark data efficiently, avoiding bottlenecks that
> discussed.
>
>
> AK
> On Monday, 9 October 2023 at 11:04:41 BST, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi,
>
> Please see my responses below:
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> No. a commit does not refer to data being delivered to a sink like
> Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
> (SS) internals. Specifically it means that a micro-batch of data has been
> processed by SSS. In the checkpoint directory there is a
> subdirectory called commits that marks the micro-batch process as completed.
>
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a
> timely manner, will there be an impact on spark streaming itself?
>
> Yes, it can potentially impact SSS. If the sink cannot absorb data in a
> timely manner, the batches will start to back up in SSS. This can cause
> Spark to run out of memory and the streaming job to fail. As I understand,
> Spark will use a combination of memory and disk storage (checkpointing).
> This can also happen if the network interface between Spark and the sink is
> disrupted. On the other hand Spark may slow down, as it tries to process
> the backed-up batches of data. You want to avoid these scenarios.
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID
>  wrote:
>
> Hello team
>
> 1) In Spark Structured Streaming does commit mean streaming data 

Re: Clarification with Spark Structured Streaming

2023-10-09 Thread ashok34...@yahoo.com.INVALID
 Thank you for your feedback Mich.
In general how can one optimise the cloud data warehouses (the sink part), to 
handle streaming Spark data efficiently, avoiding bottlenecks that discussed.

AKOn Monday, 9 October 2023 at 11:04:41 BST, Mich Talebzadeh 
 wrote:  
 
 Hi,
Please see my responses below:
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?

No. a commit does not refer to data being delivered to a sink like Snowflake or 
bigQuery. The term commit refers to Spark Structured Streaming (SS) internals. 
Specifically it means that a micro-batch of data has been processed by SSS. In 
the checkpoint directory there is a subdirectory called commits that marks the 
micro-batch process as completed.
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?

Yes, it can potentially impact SSS. If the sink cannot absorb data in a timely 
manner, the batches will start to back up in SSS. This can cause Spark to run 
out of memory and the streaming job to fail. As I understand, Spark will use a 
combination of memory and disk storage (checkpointing). This can also happen if 
the network interface between Spark and the sink is disrupted. On the other 
hand Spark may slow down, as it tries to process the backed-up batches of data. 
You want to avoid these scenarios.
HTH
Mich Talebzadeh,Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID 
 wrote:

Hello team
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?
Thanks

AK
  

Re: Updating delta file column data

2023-10-09 Thread Mich Talebzadeh
In a nutshell, is this what you are trying to do?


   1. Read the Delta table into a Spark DataFrame.
   2. Explode the string column into a struct column.
   3. Convert the hexadecimal field to an integer.
   4. Write the DataFrame back to the Delta table in merge mode with a
   unique key.

Is this a fair assessment

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:

> Hi All,
>
> I have  mentioned the sample data below and the operation I need to
> perform over there,
>
> I have delta tables with columns, in that columns I have the data in the
> string data type(contains the struct data),
>
> So, I need to update one key value in the struct field data in the string
> column of the delta table.
>
> Note: I can able to explode the string column into the struct field and
> into the individual field by using the following operation in the spark,
>
> [image: image.png]
>
> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>
> Could you suggest a possible way to perform the required action in an
> optimistic way?
>
> Note: Please feel free to ask, if you need further information.
>
> Thanks & regards,
> Karthick
>
> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk  wrote:
>
>> Hi community members,
>>
>> In databricks adls2 delta tables, I need to perform the below operation,
>> could you help me with your thoughts
>>
>>  I have the delta tables with one colum with data type string , which
>> contains the json data in string data type, I need to do the following
>> 1. I have to update one particular field value in the json and update it
>> back in the same column of the data.
>>
>> Example :
>>
>> In string column, inside json I have one field with value in hexadecimal.
>> Like { version : ''0xabcd1234"}
>>
>> I have to convert this field into corresponding integer value and update
>> back into same strong column json value.
>> Note: I have to perform this operation within this column. This column is
>> basically with data type string in delta table.
>>
>> Could you suggest some sample example.
>>
>> Thanks in advance.
>>
>


Log file location in Spark on K8s

2023-10-09 Thread Agrawal, Sanket
Hi All,

We are trying to send the spark logs using fluent-bit. We validated that 
fluent-bit is able to move logs of all other pods except the driver/executor 
pods.

It would be great if someone can guide us where should I look for spark logs in 
Spark on Kubernetes with client/cluster mode deployment.

Thanks,
Sanket A.

This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


Re: Clarification with Spark Structured Streaming

2023-10-09 Thread Mich Talebzadeh
Hi,

Please see my responses below:

1) In Spark Structured Streaming does commit mean streaming data has been
delivered to the sink like Snowflake?

No. a commit does not refer to data being delivered to a sink like
Snowflake or bigQuery. The term commit refers to Spark Structured Streaming
(SS) internals. Specifically it means that a micro-batch of data has been
processed by SSS. In the checkpoint directory there is a
subdirectory called commits that marks the micro-batch process as completed.

2) if sinks like Snowflake  cannot absorb or digest streaming data in a
timely manner, will there be an impact on spark streaming itself?

Yes, it can potentially impact SSS. If the sink cannot absorb data in a
timely manner, the batches will start to back up in SSS. This can cause
Spark to run out of memory and the streaming job to fail. As I understand,
Spark will use a combination of memory and disk storage (checkpointing).
This can also happen if the network interface between Spark and the sink is
disrupted. On the other hand Spark may slow down, as it tries to process
the backed-up batches of data. You want to avoid these scenarios.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 8 Oct 2023 at 19:50, ashok34...@yahoo.com.INVALID
 wrote:

> Hello team
>
> 1) In Spark Structured Streaming does commit mean streaming data has been
> delivered to the sink like Snowflake?
>
> 2) if sinks like Snowflake  cannot absorb or digest streaming data in a
> timely manner, will there be an impact on spark streaming itself?
>
> Thanks
>
> AK
>


Re: Updating delta file column data

2023-10-09 Thread Karthick Nk
Hi All,

I have  mentioned the sample data below and the operation I need to perform
over there,

I have delta tables with columns, in that columns I have the data in the
string data type(contains the struct data),

So, I need to update one key value in the struct field data in the string
column of the delta table.

Note: I can able to explode the string column into the struct field and
into the individual field by using the following operation in the spark,

[image: image.png]

df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')

Could you suggest a possible way to perform the required action in an
optimistic way?

Note: Please feel free to ask, if you need further information.

Thanks & regards,
Karthick

On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk  wrote:

> Hi community members,
>
> In databricks adls2 delta tables, I need to perform the below operation,
> could you help me with your thoughts
>
>  I have the delta tables with one colum with data type string , which
> contains the json data in string data type, I need to do the following
> 1. I have to update one particular field value in the json and update it
> back in the same column of the data.
>
> Example :
>
> In string column, inside json I have one field with value in hexadecimal.
> Like { version : ''0xabcd1234"}
>
> I have to convert this field into corresponding integer value and update
> back into same strong column json value.
> Note: I have to perform this operation within this column. This column is
> basically with data type string in delta table.
>
> Could you suggest some sample example.
>
> Thanks in advance.
>


Clarification with Spark Structured Streaming

2023-10-08 Thread ashok34...@yahoo.com.INVALID
Hello team
1) In Spark Structured Streaming does commit mean streaming data has been 
delivered to the sink like Snowflake?
2) if sinks like Snowflake  cannot absorb or digest streaming data in a timely 
manner, will there be an impact on spark streaming itself?
Thanks

AK

Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Igor Calabria
You might be affected by this issue:
https://github.com/apache/iceberg/issues/8601

It was already patched but it isn't released yet.

On Thu, Oct 5, 2023 at 7:47 PM Prashant Sharma  wrote:

> Hi Sanket, more details might help here.
>
> How does your spark configuration look like?
>
> What exactly was done when this happened?
>
> On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
>  wrote:
>
>> Hello Everyone,
>>
>>
>>
>> We are trying to stream the changes in our Iceberg tables stored in AWS
>> S3. We are achieving this through Spark-Iceberg Connector and using JAR
>> files for Spark-AWS. Suddenly we have started receiving error “Connection
>> pool shut down”.
>>
>>
>>
>> Spark Version: 3.4.1
>>
>> Iceberg: 1.3.1
>>
>>
>>
>> Any help or guidance would of great help.
>>
>>
>>
>> Thank You,
>>
>> Sanket A.
>>
>>
>>
>> This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message and any disclosure, copying, or distribution of this message,
>> or the taking of any action based on it, by you is strictly prohibited.
>>
>> Deloitte refers to a Deloitte member firm, one of its related entities,
>> or Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is
>> a separate legal entity and a member of DTTL. DTTL does not provide
>> services to clients. Please see www.deloitte.com/about to learn more.
>>
>> v.E.1
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Angshuman Bhattacharya
Thanks Ahmed. I am trying to bring this up with Spark DE community

On Thu, Oct 5, 2023 at 12:32 PM Ahmed Albalawi <
ahmed.albal...@capitalone.com> wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
>

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Sean Owen
I think we already updated this in Spark 4. However for now you would have
to also include a JAR with the jakarta.* classes instead.
You are welcome to try Spark 4 now by building from master, but it's far
from release.

On Thu, Oct 5, 2023 at 11:53 AM Ahmed Albalawi
 wrote:

> Hello team,
>
> We are in the process of upgrading one of our apps to Spring Boot 3.x
> while using Spark, and we have encountered an issue with Spark
> compatibility, specifically with Jakarta Servlet. Spring Boot 3.x uses
> Jakarta Servlet while Spark uses Javax Servlet. Can we get some guidance on
> how to upgrade to Spring Boot 3.x while continuing to use Spark.
>
> The specific error is listed below:
>
> java.lang.NoClassDefFoundError: javax/servlet/Servlet
> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
> at org.apache.spark.SparkContext.(SparkContext.scala:503)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
> at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
>
> The error comes up when we try to run a mvn clean install, and the issue is 
> in our test cases. This issue happens specifically when we build our spark 
> session. The line of code it traces down to is as follows:
>
> *session = 
> SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*
>
> What we have tried:
>
> - We noticed according to this post 
> ,
>  there are no compatible versions of spark using version 5 of the Jakarta 
> Servlet API
>
> - We've tried 
> 
>  using the maven shade plugin to use jakarta instead of javax, but are 
> running into some other issues with this.
> - We've also looked at the following 
> 
>  to use jakarta 4.x with jersey 2.x and still have an issue with the servlet
>
>
> Please let us know if there are any solutions to this issue. Thanks!
>
>
> --
> *Ahmed Albalawi*
>
> Senior Associate Software Engineer • EP2 Tech - CuRE
>
> 571-668-3911 •  1680 Capital One Dr.
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Spark Compatibility with Spring Boot 3.x

2023-10-05 Thread Ahmed Albalawi
Hello team,

We are in the process of upgrading one of our apps to Spring Boot 3.x while
using Spark, and we have encountered an issue with Spark compatibility,
specifically with Jakarta Servlet. Spring Boot 3.x uses Jakarta Servlet
while Spark uses Javax Servlet. Can we get some guidance on how to upgrade
to Spring Boot 3.x while continuing to use Spark.

The specific error is listed below:

java.lang.NoClassDefFoundError: javax/servlet/Servlet
at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:239)
at org.apache.spark.SparkContext.(SparkContext.scala:503)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2888)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)

The error comes up when we try to run a mvn clean install, and the
issue is in our test cases. This issue happens specifically when we
build our spark session. The line of code it traces down to is as
follows:

*session = 
SparkSession.builder().sparkContext(SparkContext.getOrCreate(sparkConf)).getOrCreate();*

What we have tried:

- We noticed according to this post
,
there are no compatible versions of spark using version 5 of the
Jakarta Servlet API

- We've tried 

using the maven shade plugin to use jakarta instead of javax, but are
running into some other issues with this.
- We've also looked at the following

to use jakarta 4.x with jersey 2.x and still have an issue with the
servlet


Please let us know if there are any solutions to this issue. Thanks!


-- 
*Ahmed Albalawi*

Senior Associate Software Engineer • EP2 Tech - CuRE

571-668-3911 •  1680 Capital One Dr.

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Mich Talebzadeh
The fact that you have 60 partitions or brokers in kaka  is not directly
correlated  to Spark Structured Streaming (SSS) executors by itself. See
below.

Spark starts with 200 partitions. However, by default, Spark/PySpark
creates partitions that are equal to the number of CPU cores in the node,
the so called vcores. So it depends on the number of nodes you are using in
your spark cluster.

Without doing a PoC you would not need to worry about repartition(10) in
your writeStream. I suggest that for now you remove that parameter and
observe the spark processing through Spark GUI (default port 4040) and in
particular the page on Structured Streaming". Your sink is Delta Lake which
is no different from any other data warehouses such as Google BigQuery.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink ( Delta Lake) has an issue absorbing data in a timely manner
as per above formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka. We can start by assuming that the rate of increase in the
number of messages processed (processing time) will require an additional
reserved capacity. We can anticipate a heuristic 70% (~1SD) increase in the
processing time so in theory you  should be able to handle all this work
below the batch interval.

The parameter which I think many deploy is
spark.streaming.backpressure.enabled
> (spark.conf.set("spark.streaming.backpressure.enabled", "true"). The
central idea is that if a component is struggling to keep up, it should
communicate to upstream components and get them to reduce the load. In the
context of Spark Streaming, the receiver is the upstream component which
gets notified if the executors cannot keep up. There are a number of
occasions this will  (not just necessarily the spike in the incoming
messages). For example:

   - Streaming Source: Unexpected short burst of incoming messages in
   source system
   - YARN: Lost Spark executors due to node(s) failure
   - External Sink System: High load on external systems such as Delta
   Lake, BigQuery etc

Without backpressure, microbatches queue up over time and the scheduling
delay increases (check Operation Duration from GUI).

The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is
 the total records per second. It relies on
spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum
rate (number of records per second) at which messages will be read from
each Kafka partition.

So  sparkStreamingBackpressurePidMinRate starts with

n (total number of kafka partitions)
* spark.streaming.kafka.maxRatePerPartition * Batch Interval

spark.streaming.kafka.maxRatePerPartition is used to control the maximum
rate of data ingestion from Kafka per partition. Kafka topics can have
multiple partitions, and Spark Streaming processes data in parallel by
reading from these partitions.
If you set spark.streaming.kafka.maxRatePerPartition to 1000, Spark
Streaming will consume data from each Kafka partition at a rate of up to
1000 messages per second.

So in your case if you set it goes something like

60 * 1000 * Batch Interval (in seconds)

Of course I stand corrected.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Oct 2023 at 05:54, Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
>   

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Perez
You can try the 'optimize' command of delta lake. That will help you for
sure. It merges small files. Also, it depends on the file format. If you
are working with Parquet then still small files should not cause any issues.

P.

On Thu, Oct 5, 2023 at 10:55 AM Shao Yang Hong
 wrote:

> Hi Raghavendra,
>
> Yes, we are trying to reduce the number of files in delta as well (the
> small file problem [0][1]).
>
> We already have a scheduled app to compact files, but the number of
> files is still large, at 14K files per day.
>
> [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
>  wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition() .. to reduce
> the number of files in delta?
> > Also note that there is an alternative option of using coalesce()
> instead of repartition().
> > --
> > Raghavendra
> >
> >
> > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <
> shaoyang.h...@ninjavan.co.invalid> wrote:
> >>
> >> Hi all on user@spark:
> >>
> >> We are looking for advice and suggestions on how to tune the
> >> .repartition() parameter.
> >>
> >> We are using Spark Streaming on our data pipeline to consume messages
> >> and persist them to a Delta Lake
> >> (https://delta.io/learn/getting-started/).
> >>
> >> We read messages from a Kafka topic, then add a generated date column
> >> as a daily partitioning, and save these records to Delta Lake. We have
> >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> >> (so 4 Kafka partitions per executor).
> >>
> >> How then, should we use .repartition()? Should we omit this parameter?
> >> Or set it to 15? or 4?
> >>
> >> Our code looks roughly like the below:
> >>
> >> ```
> >> df = (
> >> spark.readStream.format("kafka")
> >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> >> .option("subscribe", os.environ["KAFKA_TOPIC"])
> >> .load()
> >> )
> >>
> >> table = (
> >> df.select(
> >> from_protobuf(
> >> "value", "table", "/opt/protobuf-desc/table.desc"
> >> ).alias("msg")
> >> )
> >> .withColumn("uuid", col("msg.uuid"))
> >> # etc other columns...
> >>
> >> # generated column for daily partitioning in Delta Lake
> >> .withColumn(CREATED_DATE,
> >> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> >> .drop("msg")
> >> )
> >>
> >> query = (
> >> table
> >> .repartition(10).writeStream
> >> .queryName(APP_NAME)
> >> .outputMode("append")
> >> .format("delta")
> >> .partitionBy(CREATED_DATE)
> >> .option("checkpointLocation", os.environ["CHECKPOINT"])
> >> .start(os.environ["DELTA_PATH"])
> >> )
> >>
> >> query.awaitTermination()
> >> spark.stop()
> >> ```
> >>
> >> Any advice would be appreciated.
> >>
> >> --
> >> Best Regards,
> >> Shao Yang HONG
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> --
> Best Regards,
> Shao Yang HONG
> Software Engineer, Pricing, Tech
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Prashant Sharma
Hi Sanket, more details might help here.

How does your spark configuration look like?

What exactly was done when this happened?

On Thu, 5 Oct, 2023, 2:29 pm Agrawal, Sanket,
 wrote:

> Hello Everyone,
>
>
>
> We are trying to stream the changes in our Iceberg tables stored in AWS
> S3. We are achieving this through Spark-Iceberg Connector and using JAR
> files for Spark-AWS. Suddenly we have started receiving error “Connection
> pool shut down”.
>
>
>
> Spark Version: 3.4.1
>
> Iceberg: 1.3.1
>
>
>
> Any help or guidance would of great help.
>
>
>
> Thank You,
>
> Sanket A.
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Connection pool shut down in Spark Iceberg Streaming Connector

2023-10-05 Thread Agrawal, Sanket
Hello Everyone,

We are trying to stream the changes in our Iceberg tables stored in AWS S3. We 
are achieving this through Spark-Iceberg Connector and using JAR files for 
Spark-AWS. Suddenly we have started receiving error "Connection pool shut down".

Spark Version: 3.4.1
Iceberg: 1.3.1

Any help or guidance would of great help.

Thank You,
Sanket A.


This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


spark_iceberg_streaming.logs
Description: spark_iceberg_streaming.logs

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
 wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
>  wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>> df = (
>> spark.readStream.format("kafka")
>> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>> .option("subscribe", os.environ["KAFKA_TOPIC"])
>> .load()
>> )
>>
>> table = (
>> df.select(
>> from_protobuf(
>> "value", "table", "/opt/protobuf-desc/table.desc"
>> ).alias("msg")
>> )
>> .withColumn("uuid", col("msg.uuid"))
>> # etc other columns...
>>
>> # generated column for daily partitioning in Delta Lake
>> .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
>> .drop("msg")
>> )
>>
>> query = (
>> table
>> .repartition(10).writeStream
>> .queryName(APP_NAME)
>> .outputMode("append")
>> .format("delta")
>> .partitionBy(CREATED_DATE)
>> .option("checkpointLocation", os.environ["CHECKPOINT"])
>> .start(os.environ["DELTA_PATH"])
>> )
>>
>> query.awaitTermination()
>> spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra


On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
> ).alias("msg")
> )
> .withColumn("uuid", col("msg.uuid"))
> # etc other columns...
>
> # generated column for daily partitioning in Delta Lake
> .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> .drop("msg")
> )
>
> query = (
> table
> .repartition(10).writeStream
> .queryName(APP_NAME)
> .outputMode("append")
> .format("delta")
> .partitionBy(CREATED_DATE)
> .option("checkpointLocation", os.environ["CHECKPOINT"])
> .start(os.environ["DELTA_PATH"])
> )
>
> query.awaitTermination()
> spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Core]: Recomputation cost of a job due to executor failures

2023-10-04 Thread Faiz Halde
Hello,

Due to the way Spark implements shuffle, a loss of an executor sometimes
results in the recomputation of partitions that were lost

The definition of a *partition* is the tuple ( RDD-ids, partition id )
RDD-ids is a sequence of RDD ids

In our system, we define the unit of work performed for a job X as

work = count of tasks executed to complete the job X

We want to be able to segregate the *goodput* from this metric

Goodput is defined as - Had there been 0 failures in a cluster, how many
tasks spark had to compute to complete this job

Using the event listener, would the following work?

1. Build a hashmap of type [(RDD-ids, partition), int] with default value =
0
2. For each task T, hashmap[(T.RDD-ids, T.partition-id)] += 1

The assumption here is that spark will never recompute a *partition* twice
( when there are no failures ). Is this assumption true?

So for any entry, a value of greater than 1 means that the particular
partition identified by the tuple ( RDD-ids, partition id ) was recomputed
because spark thought the partition was "lost"

Given the above data structure, the recomputation cost would be
1 - (hashmap.size() / sum(hashmap.values))

Thanks
Faiz


Updating delta file column data

2023-10-02 Thread Karthick Nk
Hi community members,

In databricks adls2 delta tables, I need to perform the below operation,
could you help me with your thoughts

 I have the delta tables with one colum with data type string , which
contains the json data in string data type, I need to do the following
1. I have to update one particular field value in the json and update it
back in the same column of the data.

Example :

In string column, inside json I have one field with value in hexadecimal.
Like { version : ''0xabcd1234"}

I have to convert this field into corresponding integer value and update
back into same strong column json value.
Note: I have to perform this operation within this column. This column is
basically with data type string in delta table.

Could you suggest some sample example.

Thanks in advance.


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jon Rodríguez Aranguren
Dear Jörn Franke, Jayabindu Singh and Spark Community members,

Thank you profoundly for your initial insights. I feel it's necessary to
provide more precision on our setup to facilitate a deeper understanding.

We're interfacing with S3 Compatible storages, but our operational context
is somewhat distinct. Our infrastructure doesn't lean on conventional cloud
providers like AWS. Instead, we've architected our environment on
On-Premise Kubernetes distributions, specifically k0s and Openshift.

Our objective extends beyond just handling S3 keys. We're orchestrating a
solution that integrates Azure SPNs, API Credentials, and other sensitive
credentials, intending to make Kubernetes' native secrets our central
management hub. The aspiration is to have a universally deployable JAR, one
that can function unmodified across different ecosystems like EMR,
Databricks (on both AWS and Azure), etc. Platforms like Databricks have
already made strides in this direction, allowing secrets to be woven
directly into the Spark Conf through mechanisms like
{{secret_scope/secret_name}}, which are resolved dynamically.

The spark-on-k8s-operator's user guide suggests the feasibility of mounting
secrets. However, a gap exists in our understanding of how to subsequently
access these mounted secret values within the Spark application's context.

Here lies my inquiry: is the spark-on-k8s-operator currently equipped to
support this level of integration? If it does, any elucidation on the
method or best practices would be pivotal for our project. Alternatively,
if you could point me to resources or community experts who have tackled
similar challenges, it would be of immense assistance.

Thank you for bearing with the intricacies of our query, and I appreciate
your continued guidance in this endeavor.

Warm regards,

Jon Rodríguez Aranguren.

El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh ()
escribió:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
There is nowadays more a trend to move away from static credentials/certificates that are stored in a secret vault. The issue is that the rotation of them is complex, once they are leaked they can be abused, making minimal permissions feasible is cumbersome etc. That is why keyless approaches are used for A2A access (workload identity federation was mentioned). E.g. in AWS EKS you would build this on oidc (https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.html) and configure this instead of using secrets. Similar approaches exist in other clouds and even on-premise (eg SPIFFE https://spiffe.io/).If this will become the standard will be difficult to say - for sure they seem to more easier to manage.Since you seem to have a Kubernetes setup which means per cloud/data Centre a lot of extra work, infrastructure cost and security issues, workload Identity federation may ease this compared to a secret store.Am 01.10.2023 um 08:27 schrieb Jon Rodríguez Aranguren :Dear Jörn Franke, Jayabindu Singh and Spark Community members,Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding.We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift.Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically.The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context.Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance.Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor.Warm regards,Jon Rodríguez Aranguren.El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh () escribió:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren :Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
With oidc sth comparable is possible: https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.htmlAm 01.10.2023 um 11:13 schrieb Mich Talebzadeh :It seems that workload identity is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity.Both Google Cloud Kubernetes (GKE) and Azure Kubernetes Service support Workload Identity. Taking notes from Google Cloud:  "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way."HTH

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom

   view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren :Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Mich Talebzadeh
It seems that workload identity
 is not
available on AWS. Workload Identity replaces the need to use Metadata
concealment on exposed storage such as s3 and gcs. The sensitive metadata
protected by metadata concealment is also protected by Workload Identity.

Both Google Cloud Kubernetes (GKE
)
and Azure Kubernetes Servi
ce
support Workload Identity. Taking notes from Google Cloud:  "Workload
Identity is the recommended way for your workloads running on Google
Kubernetes Engine (GKE) to access Google Cloud services in a secure and
manageable way."


HTH


Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jayabindu Singh
Hi Jon,

Using IAM as suggested by Jorn is the best approach.
We recently moved our spark workload from HDP to Spark on K8 and utilizing
IAM.
It will save you from secret management headaches and also allows a lot
more flexibility on access control and option to allow access to multiple
S3 buckets in the same pod.
We have implemented this across Azure, Google and AWS. Azure does require
some extra work to make it work.

On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:

> Don’t use static iam (s3) credentials. It is an outdated insecure method -
> even AWS recommend against using this for anything (cf eg
> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
> ).
> It is almost a guarantee to get your data stolen and your account
> manipulated.
>
> If you need to use kubernetes (which has its own very problematic security
> issues) then assign AWS IAM roles with minimal permissions to the pods (for
> EKS it means using OIDC, cf
> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>
> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
> jon.r.arangu...@gmail.com>:
>
> 
> Dear Spark Community Members,
>
> I trust this message finds you all in good health and spirits.
>
> I'm reaching out to the collective expertise of this esteemed community
> with a query regarding Spark on Kubernetes. As a newcomer, I have always
> admired the depth and breadth of knowledge shared within this forum, and it
> is my hope that some of you might have insights on a specific challenge I'm
> facing.
>
> I am currently trying to configure multiple Kubernetes secrets, notably
> multiple S3 keys, at the SparkConf level for a Spark application. My
> objective is to understand the best approach or methods to ensure that
> these secrets can be smoothly accessed by the Spark application.
>
> If any of you have previously encountered this scenario or possess
> relevant insights on the matter, your guidance would be highly beneficial.
>
> Thank you for your time and consideration. I'm eager to learn from the
> experiences and knowledge present within this community.
>
> Warm regards,
> Jon
>
>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jörn Franke
Don’t use static iam (s3) credentials. It is an outdated insecure method - even 
AWS recommend against using this for anything (cf eg 
https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).
It is almost a guarantee to get your data stolen and your account manipulated. 

If you need to use kubernetes (which has its own very problematic security 
issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS 
it means using OIDC, cf 
https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).

> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren 
> :
> 
> 
> Dear Spark Community Members,
> 
> I trust this message finds you all in good health and spirits.
> 
> I'm reaching out to the collective expertise of this esteemed community with 
> a query regarding Spark on Kubernetes. As a newcomer, I have always admired 
> the depth and breadth of knowledge shared within this forum, and it is my 
> hope that some of you might have insights on a specific challenge I'm facing.
> 
> I am currently trying to configure multiple Kubernetes secrets, notably 
> multiple S3 keys, at the SparkConf level for a Spark application. My 
> objective is to understand the best approach or methods to ensure that these 
> secrets can be smoothly accessed by the Spark application.
> 
> If any of you have previously encountered this scenario or possess relevant 
> insights on the matter, your guidance would be highly beneficial.
> 
> Thank you for your time and consideration. I'm eager to learn from the 
> experiences and knowledge present within this community.
> 
> Warm regards,
> Jon


using facebook Prophet + pyspark for forecasting - Dataframe has less than 2 non-NaN rows

2023-09-29 Thread karan alang
Hello - Anyone used Prophet + pyspark for forecasting ?
I'm trying to backfill forecasts, and running into issues (error -
Dataframe has less than 2 non-NaN rows)

I'm removing all records with NaN values, yet getting this error.

details are in stackoverflow link ->
https://stackoverflow.com/questions/77205021/facebook-prophet-dataframe-has-less-than-2-non-nan-rows

any ideas on how to fix/debug this?

tia!


Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-29 Thread Jon Rodríguez Aranguren
Dear Spark Community Members,

I trust this message finds you all in good health and spirits.

I'm reaching out to the collective expertise of this esteemed community
with a query regarding Spark on Kubernetes. As a newcomer, I have always
admired the depth and breadth of knowledge shared within this forum, and it
is my hope that some of you might have insights on a specific challenge I'm
facing.

I am currently trying to configure multiple Kubernetes secrets, notably
multiple S3 keys, at the SparkConf level for a Spark application. My
objective is to understand the best approach or methods to ensure that
these secrets can be smoothly accessed by the Spark application.

If any of you have previously encountered this scenario or possess relevant
insights on the matter, your guidance would be highly beneficial.

Thank you for your time and consideration. I'm eager to learn from the
experiences and knowledge present within this community.

Warm regards,
Jon


Re: Inquiry about Processing Speed

2023-09-28 Thread Jack Goodson
Hi Haseeb,

I think the user mailing list is what you're looking for, people are
usually pretty active on here if you present a direct question about apache
spark. I've linked below the community guidelines which says which mailing
lists are for what etc

https://spark.apache.org/community.html

There's a few resources below for cluster management and code performance
tweaks but if you write declaratively in Spark the planning engine does a
pretty good job of optimising jobs, it's hard to answer without a specific
problem presented, hope the docs get you started

https://spark.apache.org/docs/latest/cluster-overview.html

https://spark.apache.org/docs/latest/tuning.html

https://spark.apache.org/docs/latest/sql-performance-tuning.html

On Thu, Sep 28, 2023 at 3:22 PM Haseeb Khalid 
wrote:

> Dear Support Team,
>
> I hope this message finds you well. My name is Haseeb Khalid, and I am
> reaching out to discuss a scenario related to processing speed in Apache
> Spark.
>
> I have been utilizing these technologies in our projects, and we have
> encountered a specific use case where we are seeking to optimize processing
> speed. Given the critical nature of this requirement, I would greatly
> appreciate the opportunity to discuss this with a knowledgeable
> representative from your team.
>
> I am particularly interested in understanding any best practices,
> configuration tweaks, or architectural considerations that can be employed
> to enhance processing speed in our specific scenario.
>
> Would it be possible to schedule a call or exchange emails to delve deeper
> into this matter? I am available at your convenience and can accommodate
> any preferred mode of communication.
>
> I genuinely value the expertise of the Apache Spark communities and
> believe that your insights will be instrumental in achieving our objectives.
>
> Thank you very much for your time and consideration. I look forward to
> hearing from you soon.
>
> --
>
> Thanks & Best Regards,
>
> *Haseeb Khalid*
>
> *Senior Data Analyst*
>
> +92 306 4436 790
>
>
>


Thread dump only shows 10 shuffle clients

2023-09-28 Thread Nebi Aydin
Hi all,
I set the spark.shuffle.io.serverThreads and spark.shuffle.io.clientThreads
to *800*
But when I click Thread dump from the Spark UI for the executor: I only see
10 shuffle client threads for the executor.
Is that normal, am I missing something?


Re: Inquiry about Processing Speed

2023-09-27 Thread Deepak Goel
Hi

"Processing Speed" can be at a software level (Code Optimization) and at a
hardware level (Capacity Planning)

Deepak
"The greatness of a nation can be judged by the way its animals are treated
- Mahatma Gandhi"

+91 73500 12833
deic...@gmail.com

Facebook: https://www.facebook.com/deicool
LinkedIn: www.linkedin.com/in/deicool

"Plant a Tree, Go Green"

Make In India : http://www.makeinindia.com/home


On Thu, Sep 28, 2023 at 7:53 AM Haseeb Khalid 
wrote:

> Dear Support Team,
>
> I hope this message finds you well. My name is Haseeb Khalid, and I am
> reaching out to discuss a scenario related to processing speed in Apache
> Spark.
>
> I have been utilizing these technologies in our projects, and we have
> encountered a specific use case where we are seeking to optimize processing
> speed. Given the critical nature of this requirement, I would greatly
> appreciate the opportunity to discuss this with a knowledgeable
> representative from your team.
>
> I am particularly interested in understanding any best practices,
> configuration tweaks, or architectural considerations that can be employed
> to enhance processing speed in our specific scenario.
>
> Would it be possible to schedule a call or exchange emails to delve deeper
> into this matter? I am available at your convenience and can accommodate
> any preferred mode of communication.
>
> I genuinely value the expertise of the Apache Spark communities and
> believe that your insights will be instrumental in achieving our objectives.
>
> Thank you very much for your time and consideration. I look forward to
> hearing from you soon.
>
> --
>
> Thanks & Best Regards,
>
> *Haseeb Khalid*
>
> *Senior Data Analyst*
>
> +92 306 4436 790
>
>
>


Files io threads vs shuffle io threads

2023-09-27 Thread Nebi Aydin
Hi all,
Can someone explain the difference between
Files io threads and shuffle io threads, as I couldn't find any explanation.
I'm specifically asking about these:
spark.rpc.io.serverThreads
spark.rpc.io.clientThreads
spark.rpc.io.threads

spark.files.io.serverThreads
spark.files.io.clientThreads
spark.files.io.threads


Inquiry about Processing Speed

2023-09-27 Thread Haseeb Khalid
Dear Support Team,

I hope this message finds you well. My name is Haseeb Khalid, and I am
reaching out to discuss a scenario related to processing speed in Apache
Spark.

I have been utilizing these technologies in our projects, and we have
encountered a specific use case where we are seeking to optimize processing
speed. Given the critical nature of this requirement, I would greatly
appreciate the opportunity to discuss this with a knowledgeable
representative from your team.

I am particularly interested in understanding any best practices,
configuration tweaks, or architectural considerations that can be employed
to enhance processing speed in our specific scenario.

Would it be possible to schedule a call or exchange emails to delve deeper
into this matter? I am available at your convenience and can accommodate
any preferred mode of communication.

I genuinely value the expertise of the Apache Spark communities and believe
that your insights will be instrumental in achieving our objectives.

Thank you very much for your time and consideration. I look forward to
hearing from you soon.

-- 

Thanks & Best Regards,

*Haseeb Khalid*

*Senior Data Analyst*

+92 306 4436 790


Reading Glue Catalog Views through Spark.

2023-09-25 Thread Agrawal, Sanket
Hello Everyone,

We have setup spark and setup Iceberg-Glue connectors as mentioned at 
https://iceberg.apache.org/docs/latest/aws/ to integrate Spark, Iceberg, and 
AWS Glue Catalog. We are able to read tables through this but we are unable to 
read data through views. PFB, the error:

pyspark.errors.exceptions.captured.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] 
The table or view `db`.`vw_name ` cannot be found. Verify the spelling and 
correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() 
output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; 
line 1 pos 14;
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Project [*]
  +- 'UnresolvedRelation [db, vw_name], [], false


Any help or guidance would be of great help.

Thank You,
Sanket A.


This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1


[PySpark][Spark logs] Is it possible to dynamically customize Spark logs?

2023-09-25 Thread Ayman Rekik
Hello,

What would be the right way, if any, to inject a runtime variable into Spark 
logs. So that, for example, if Spark (driver/worker) logs some 
info/warning/error message, the variable will be output there (in order to help 
filtering logs for the sake of monitoring and troubleshooting).


Regards,




[ANNOUNCE] Apache Kyuubi released 1.7.3

2023-09-25 Thread Zhen Wang
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.7.3 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.7.3.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
On behalf of Apache Kyuubi community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Connect Multi-tenant Support

2023-09-22 Thread Kezhi Xiong
Hi,

>From Spark Connect's official site's image, it mentions the "Multi-tenant
Application Gateway" on driver. Are there any more documents about it? Can
I know how users can utilize such a feature?

Thanks,
Kezhi


Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-22 Thread Karthick
Hi All,

It will be helpful if we gave any pointers to the problem addressed.

Thanks
Karthick.

On Wed, Sep 20, 2023 at 3:03 PM Gowtham S  wrote:

> Hi Spark Community,
>
> Thank you for bringing up this issue. We've also encountered the same
> challenge and are actively working on finding a solution. It's reassuring
> to know that we're not alone in this.
>
> If you have any insights or suggestions regarding how to address this
> problem, please feel free to share them.
>
> Looking forward to hearing from others who might have encountered similar
> issues.
>
>
> Thanks and regards,
> Gowtham S
>
>
> On Tue, 19 Sept 2023 at 17:23, Karthick 
> wrote:
>
>> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
>>
>> Dear Spark Community,
>>
>> I recently reached out to the Apache Flink community for assistance with
>> a critical issue we are facing in our IoT platform, which relies on Apache
>> Kafka and real-time data processing. We received some valuable insights and
>> suggestions from the Apache Flink community, and now, we would like to seek
>> your expertise and guidance on the same problem.
>>
>> In our IoT ecosystem, we are dealing with data streams from thousands of
>> devices, each uniquely identified. To maintain data integrity and ordering,
>> we have configured a Kafka topic with ten partitions, ensuring that each
>> device's data is directed to its respective partition based on its unique
>> identifier. While this architectural choice has been effective in
>> maintaining data order, it has unveiled a significant challenge:
>>
>> *Slow Consumer and Data Skew Problem:* When a single device experiences
>> processing delays, it acts as a bottleneck within the Kafka partition,
>> leading to delays in processing data from other devices sharing the same
>> partition. This issue severely affects the efficiency and scalability of
>> our entire data processing pipeline.
>>
>> Here are some key details:
>>
>> - Number of Devices: 1000 (with potential growth)
>> - Target Message Rate: 1000 messages per second (with expected growth)
>> - Kafka Partitions: 10 (some partitions are overloaded)
>> - We are planning to migrate from Apache Storm to Apache Flink/Spark.
>>
>> We are actively seeking guidance on the following aspects:
>>
>> *1. Independent Device Data Processing*: We require a strategy that
>> guarantees one device's processing speed does not affect other devices in
>> the same Kafka partition. In other words, we need a solution that ensures
>> the independent processing of each device's data.
>>
>> *2. Custom Partitioning Strategy:* We are looking for a custom
>> partitioning strategy to distribute the load evenly across Kafka
>> partitions. Currently, we are using Murmur hashing with the device's unique
>> identifier, but we are open to exploring alternative partitioning
>> strategies.
>>
>> *3. Determining Kafka Partition Count:* We seek guidance on how to
>> determine the optimal number of Kafka partitions to handle the target
>> message rate efficiently.
>>
>> *4. Handling Data Skew:* Strategies or techniques for handling data skew
>> within Apache Flink.
>>
>> We believe that many in your community may have faced similar challenges
>> or possess valuable insights into addressing them. Your expertise and
>> experiences can greatly benefit our team and the broader community dealing
>> with real-time data processing.
>>
>> If you have any knowledge, solutions, or references to open-source
>> projects, libraries, or community-contributed solutions that align with our
>> requirements, we would be immensely grateful for your input.
>>
>> We appreciate your prompt attention to this matter and eagerly await your
>> responses and insights. Your support will be invaluable in helping us
>> overcome this critical challenge.
>>
>> Thank you for your time and consideration.
>>
>> Thanks & regards,
>> Karthick.
>>
>


Re: Parallel write to different partitions

2023-09-21 Thread Shrikant Prasad
Found this issue reported earlier but was bulk closed:
https://issues.apache.org/jira/browse/SPARK-27030

Regards,
Shrikant

On Fri, 22 Sep 2023 at 12:03 AM, Shrikant Prasad 
wrote:

> Hi all,
>
> We have multiple spark jobs running in parallel trying to write into same
> hive table but each job writing into different partition. This was working
> fine with Spark 2.3 and Hadoop 2.7.
>
> But after upgrading to Spark 3.2 and Hadoop 3.2.2, these parallel jobs are
> failing with FileNotFound exceptions for files under
> /warehouse/db/table/temporary/0/ directory.
>
> It seems earlier the temporary dir was created under the partition being
> written but now its created directly under the table directory which is
> causing concurrency issues with multiple jobs trying to cleanup the same
> temporary directory.
>
> Is there a way now to achieve parallel writes to different partitions of
> same table? Also any insight into what caused the change in behavior of
> temporary dir creation will be helpful.
>
> Thanks and regards,
> Shrikant
>


Parallel write to different partitions

2023-09-21 Thread Shrikant Prasad
Hi all,

We have multiple spark jobs running in parallel trying to write into same
hive table but each job writing into different partition. This was working
fine with Spark 2.3 and Hadoop 2.7.

But after upgrading to Spark 3.2 and Hadoop 3.2.2, these parallel jobs are
failing with FileNotFound exceptions for files under
/warehouse/db/table/temporary/0/ directory.

It seems earlier the temporary dir was created under the partition being
written but now its created directly under the table directory which is
causing concurrency issues with multiple jobs trying to cleanup the same
temporary directory.

Is there a way now to achieve parallel writes to different partitions of
same table? Also any insight into what caused the change in behavior of
temporary dir creation will be helpful.

Thanks and regards,
Shrikant


Re: Need to split incoming data into PM on time column and find the top 5 by volume of data

2023-09-21 Thread Mich Talebzadeh
In general you can probably do all this in spark-sql by reading in Hive
table through a DF in Pyspark, then creating a TempView on that DF, select
PM data through CAST() function and then use a windowing function to select
the top 5 with DENSE_RANK()

#Read  Hive table as a DataFrame
df = spark.read.table("hive.sample_data")
#  Create a view on the DataFrame
df.createOrReplaceTempView("tmp")
 sqltext = f"""
   SELECT incoming_ip, total_volume
FROM ( SELECT incoming_ip, SUM(volume) AS total_volume, DENSE_RANK() OVER (
ORDER BY SUM(volume) DESC) AS rank FROM tmp WHERE CAST(time_in AS TIME)
BETWEEN '12:00:00' AND '23:59:59' GROUP BY incoming_ip ) ranked_ips WHERE
rank <= 5;
"""
spark.sql(sqltext).show(5,False)

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 21 Sept 2023 at 18:03, ashok34...@yahoo.com.INVALID
 wrote:

> Hello gurus,
>
> I have a Hive table created as below (there are more columns)
>
> CREATE TABLE hive.sample_data ( incoming_ip STRING, time_in TIMESTAMP,
> volume INT );
>
> Data is stored in that table
>
> In PySpark, I want to  select the top 5 incoming IP addresses with the
> highest total volume of data transferred during the PM hours. PM hours are
> decided by the column time_in with values like '00:45:00', '11:35:00',
> '18:25:00'
>
> Any advice is appreciated.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Need to split incoming data into PM on time column and find the top 5 by volume of data

2023-09-21 Thread ashok34...@yahoo.com.INVALID
Hello gurus,

I have a Hive table created as below (there are more columns)

CREATE TABLE hive.sample_data ( incoming_ip STRING, time_in TIMESTAMP, volume 
INT );

Data is stored in that table

In PySpark, I want to  select the top 5 incoming IP addresses with the highest 
total volume of data transferred during the PM hours. PM hours are decided by 
the column time_in with values like '00:45:00', '11:35:00', '18:25:00'

Any advice is appreciated.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark 3.5.0 on PyPI

2023-09-20 Thread Kezhi Xiong
Oh, I saw it now. Thanks!

On Wed, Sep 20, 2023 at 1:04 PM Sean Owen  wrote:

> [ External sender. Exercise caution. ]
>
> I think the announcement mentioned there were some issues with pypi and
> the upload size this time. I am sure it's intended to be there when
> possible.
>
> On Wed, Sep 20, 2023, 3:00 PM Kezhi Xiong 
> wrote:
>
>> Hi,
>>
>> Are there any plans to upload PySpark 3.5.0 to PyPI (
>> https://pypi.org/project/pyspark/)? It's still 3.4.1.
>>
>> Thanks,
>> Kezhi
>>
>>
>>


Re: PySpark 3.5.0 on PyPI

2023-09-20 Thread Sean Owen
I think the announcement mentioned there were some issues with pypi and the
upload size this time. I am sure it's intended to be there when possible.

On Wed, Sep 20, 2023, 3:00 PM Kezhi Xiong  wrote:

> Hi,
>
> Are there any plans to upload PySpark 3.5.0 to PyPI (
> https://pypi.org/project/pyspark/)? It's still 3.4.1.
>
> Thanks,
> Kezhi
>
>
>


PySpark 3.5.0 on PyPI

2023-09-20 Thread Kezhi Xiong
Hi,

Are there any plans to upload PySpark 3.5.0 to PyPI (
https://pypi.org/project/pyspark/)? It's still 3.4.1.

Thanks,
Kezhi


[Spark 3.5.0] Is the protobuf-java JAR no longer shipped with Spark?

2023-09-20 Thread Gijs Hendriksen

Hi all,

This week, I tried upgrading to Spark 3.5.0, as it contained some fixes 
for spark-protobuf that I need for my project. However, my code is no 
longer running under Spark 3.5.0.


My build.sbt file is configured as follows:

val sparkV  = "3.5.0"
val hadoopV = "3.3.6"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core"   % sparkV  % "provided",
  "org.apache.spark" %% "spark-sql"    % sparkV  % "provided",
  "org.apache.hadoop"    %  "hadoop-client"    % hadoopV % "provided",
  "org.apache.spark" %% "spark-protobuf"   % sparkV,
)

I am using sbt-assembly to build a fat JAR, but I exclude Spark and 
Hadoop JARs to limit the assembled JAR size. Spark (and its 
dependencies) are supplied in our environment by the jars/ directory 
included in the the Spark distribution.


However, when running my application (which uses protobuf-java's 
CodedOutputStream for writing delimited protobuf files) with Spark 
3.5.0, I now get the following error:


...
Caused by: java.lang.ClassNotFoundException: 
com.google.protobuf.CodedOutputStream

    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 22 more

When inspecting the jars/ directory in the newest Spark release 
(spark-3.5.0-bin-hadoop3), I noticed the protobuf-java JAR was no longer 
included in the release, while it was present in Spark 3.4.1. My code 
seems to compile because protobuf-java is still a dependency of 
spark-core:3.5.0, but since the JAR is no longer included, the class 
cannot be found at runtime.


Is this expected/intentional behaviour? I was able to resolve the issue 
by manually adding protobuf-java as a dependency to my own project and 
including it in the fat JAR, but it seems weird to me that it is no 
longer shipped with Spark since the newest release. I also could not 
find any mention of this change in the release notes or elsewhere, but 
perhaps I missed something.


Thanks in advance for any help!

Cheers,
Gijs


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-20 Thread Sean Owen
This has turned into a big thread for a simple thing and has been answered
3 times over now.

Neither is better, they just calculate different things. That the 'default'
is sample stddev is just convention.
stddev_pop is the simple standard deviation of a set of numbers
stddev_samp is used when the set of numbers is a sample from a notional
larger population, and you estimate the stddev of the population from the
sample.

They only differ in the denominator. Neither is more efficient at all or
more/less sensitive to outliers.

On Wed, Sep 20, 2023 at 3:06 AM Mich Talebzadeh 
wrote:

> Spark uses the sample standard deviation stddev_samp by default, whereas
> *Hive* uses population standard deviation stddev_pop as default.
>
> My understanding is that spark uses sample standard deviation by default
> because
>
>- It is more commonly used.
>- It is more efficient to calculate.
>- It is less sensitive to outliers. (data points that differ
>significantly from other observations in a dataset. They can be caused by a
>variety of factors, such as measurement errors or edge events.)
>
> The sample standard deviation is less sensitive to outliers because it
> divides by N-1 instead of N. This means that a single outlier will have a
> smaller impact on the sample standard deviation than it would on the
> population standard deviation.
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 19 Sept 2023 at 21:50, Sean Owen  wrote:
>
>> Pyspark follows SQL databases here. stddev is stddev_samp, and sample
>> standard deviation is the calculation with the Bessel correction, n-1 in
>> the denominator. stddev_pop is simply standard deviation, with n in the
>> denominator.
>>
>> On Tue, Sep 19, 2023 at 7:13 AM Helene Bøe 
>> wrote:
>>
>>> Hi!
>>>
>>>
>>>
>>> I am applying the stddev function (so actually stddev_samp), however
>>> when comparing with the sample standard deviation in Excel the resuls do
>>> not match.
>>>
>>> I cannot find in your documentation any more specifics on how the sample
>>> standard deviation is calculated, so I cannot compare the difference toward
>>> excel, which uses
>>>
>>> .
>>>
>>> I am trying to avoid using Excel at all costs, but if the stddev_samp
>>> function is not calculating the standard deviation correctly I have a
>>> problem.
>>>
>>> I hope you can help me resolve this issue.
>>>
>>>
>>>
>>> Kindest regards,
>>>
>>>
>>>
>>> *Helene Bøe*
>>> *Graduate Project Engineer*
>>> Recycling Process & Support
>>>
>>> M: +47 980 00 887
>>> helene.b...@hydro.com
>>> 
>>>
>>> Norsk Hydro ASA
>>> Drammensveien 264
>>> NO-0283 Oslo, Norway
>>> www.hydro.com
>>> 
>>>
>>>
>>> NOTICE: This e-mail transmission, and any documents, files or previous
>>> e-mail messages attached to it, may contain confidential or privileged
>>> information. If you are not the intended recipient, or a person responsible
>>> for delivering it to the intended recipient, you are hereby notified that
>>> any disclosure, copying, distribution or use of any of the information
>>> contained in or attached to this message is STRICTLY PROHIBITED. If you
>>> have received this transmission in error, please immediately notify the
>>> sender and delete the e-mail and attached documents. Thank you.
>>>
>>


Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-20 Thread Gowtham S
Hi Spark Community,

Thank you for bringing up this issue. We've also encountered the same
challenge and are actively working on finding a solution. It's reassuring
to know that we're not alone in this.

If you have any insights or suggestions regarding how to address this
problem, please feel free to share them.

Looking forward to hearing from others who might have encountered similar
issues.


Thanks and regards,
Gowtham S


On Tue, 19 Sept 2023 at 17:23, Karthick  wrote:

> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
>
> Dear Spark Community,
>
> I recently reached out to the Apache Flink community for assistance with a
> critical issue we are facing in our IoT platform, which relies on Apache
> Kafka and real-time data processing. We received some valuable insights and
> suggestions from the Apache Flink community, and now, we would like to seek
> your expertise and guidance on the same problem.
>
> In our IoT ecosystem, we are dealing with data streams from thousands of
> devices, each uniquely identified. To maintain data integrity and ordering,
> we have configured a Kafka topic with ten partitions, ensuring that each
> device's data is directed to its respective partition based on its unique
> identifier. While this architectural choice has been effective in
> maintaining data order, it has unveiled a significant challenge:
>
> *Slow Consumer and Data Skew Problem:* When a single device experiences
> processing delays, it acts as a bottleneck within the Kafka partition,
> leading to delays in processing data from other devices sharing the same
> partition. This issue severely affects the efficiency and scalability of
> our entire data processing pipeline.
>
> Here are some key details:
>
> - Number of Devices: 1000 (with potential growth)
> - Target Message Rate: 1000 messages per second (with expected growth)
> - Kafka Partitions: 10 (some partitions are overloaded)
> - We are planning to migrate from Apache Storm to Apache Flink/Spark.
>
> We are actively seeking guidance on the following aspects:
>
> *1. Independent Device Data Processing*: We require a strategy that
> guarantees one device's processing speed does not affect other devices in
> the same Kafka partition. In other words, we need a solution that ensures
> the independent processing of each device's data.
>
> *2. Custom Partitioning Strategy:* We are looking for a custom
> partitioning strategy to distribute the load evenly across Kafka
> partitions. Currently, we are using Murmur hashing with the device's unique
> identifier, but we are open to exploring alternative partitioning
> strategies.
>
> *3. Determining Kafka Partition Count:* We seek guidance on how to
> determine the optimal number of Kafka partitions to handle the target
> message rate efficiently.
>
> *4. Handling Data Skew:* Strategies or techniques for handling data skew
> within Apache Flink.
>
> We believe that many in your community may have faced similar challenges
> or possess valuable insights into addressing them. Your expertise and
> experiences can greatly benefit our team and the broader community dealing
> with real-time data processing.
>
> If you have any knowledge, solutions, or references to open-source
> projects, libraries, or community-contributed solutions that align with our
> requirements, we would be immensely grateful for your input.
>
> We appreciate your prompt attention to this matter and eagerly await your
> responses and insights. Your support will be invaluable in helping us
> overcome this critical challenge.
>
> Thank you for your time and consideration.
>
> Thanks & regards,
> Karthick.
>


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-20 Thread Mich Talebzadeh
Spark uses the sample standard deviation stddev_samp by default, whereas
*Hive* uses population standard deviation stddev_pop as default.

My understanding is that spark uses sample standard deviation by default
because

   - It is more commonly used.
   - It is more efficient to calculate.
   - It is less sensitive to outliers. (data points that differ
   significantly from other observations in a dataset. They can be caused by a
   variety of factors, such as measurement errors or edge events.)

The sample standard deviation is less sensitive to outliers because it
divides by N-1 instead of N. This means that a single outlier will have a
smaller impact on the sample standard deviation than it would on the
population standard deviation.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 19 Sept 2023 at 21:50, Sean Owen  wrote:

> Pyspark follows SQL databases here. stddev is stddev_samp, and sample
> standard deviation is the calculation with the Bessel correction, n-1 in
> the denominator. stddev_pop is simply standard deviation, with n in the
> denominator.
>
> On Tue, Sep 19, 2023 at 7:13 AM Helene Bøe 
> wrote:
>
>> Hi!
>>
>>
>>
>> I am applying the stddev function (so actually stddev_samp), however when
>> comparing with the sample standard deviation in Excel the resuls do not
>> match.
>>
>> I cannot find in your documentation any more specifics on how the sample
>> standard deviation is calculated, so I cannot compare the difference toward
>> excel, which uses
>>
>> .
>>
>> I am trying to avoid using Excel at all costs, but if the stddev_samp
>> function is not calculating the standard deviation correctly I have a
>> problem.
>>
>> I hope you can help me resolve this issue.
>>
>>
>>
>> Kindest regards,
>>
>>
>>
>> *Helene Bøe*
>> *Graduate Project Engineer*
>> Recycling Process & Support
>>
>> M: +47 980 00 887
>> helene.b...@hydro.com
>> 
>>
>> Norsk Hydro ASA
>> Drammensveien 264
>> NO-0283 Oslo, Norway
>> www.hydro.com
>> 
>>
>>
>> NOTICE: This e-mail transmission, and any documents, files or previous
>> e-mail messages attached to it, may contain confidential or privileged
>> information. If you are not the intended recipient, or a person responsible
>> for delivering it to the intended recipient, you are hereby notified that
>> any disclosure, copying, distribution or use of any of the information
>> contained in or attached to this message is STRICTLY PROHIBITED. If you
>> have received this transmission in error, please immediately notify the
>> sender and delete the e-mail and attached documents. Thank you.
>>
>


unsubscribe

2023-09-19 Thread Danilo Sousa
unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2023-09-19 Thread Ghousia
unsubscribe


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-19 Thread Mich Talebzadeh
Hi Helen,

Assuming you want to calculate stddev_samp,  Spark correctly points  STDDEV
to STDDEV_SAMP.

In below replace sales with your table name and AMOUNT_SOLD with the column
you want to do the calculation

SELECT

SQRT((SUM(POWER(AMOUNT_SOLD,2))-(COUNT(1)*POWER(AVG(AMOUNT_SOLD),2)))/(COUNT(1)-1))
AS MYSTDDEV,
STDDEV(amount_sold) AS STDDEV,
STDDEV_SAMP(amount_sold) AS STDDEV_SAMP,
STDDEV_POP(amount_sold) AS STDDEV_POP
fromsales;

for me it returned

++-++-+--+
|  mystddev  |   stddev|stddev_samp |
stddev_pop  |
++-++-+--+
| 260.7270919450411  | 260.7270722861637   | 260.7270722861637  |
260.72704617042166  |
++-++-+--+

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 19 Sept 2023 at 13:14, Helene Bøe 
wrote:

> Hi!
>
>
>
> I am applying the stddev function (so actually stddev_samp), however when
> comparing with the sample standard deviation in Excel the resuls do not
> match.
>
> I cannot find in your documentation any more specifics on how the sample
> standard deviation is calculated, so I cannot compare the difference toward
> excel, which uses
>
> .
>
> I am trying to avoid using Excel at all costs, but if the stddev_samp
> function is not calculating the standard deviation correctly I have a
> problem.
>
> I hope you can help me resolve this issue.
>
>
>
> Kindest regards,
>
>
>
> *Helene Bøe*
> *Graduate Project Engineer*
> Recycling Process & Support
>
> M: +47 980 00 887
> helene.b...@hydro.com
> 
>
> Norsk Hydro ASA
> Drammensveien 264
> NO-0283 Oslo, Norway
> www.hydro.com
> 
>
>
> NOTICE: This e-mail transmission, and any documents, files or previous
> e-mail messages attached to it, may contain confidential or privileged
> information. If you are not the intended recipient, or a person responsible
> for delivering it to the intended recipient, you are hereby notified that
> any disclosure, copying, distribution or use of any of the information
> contained in or attached to this message is STRICTLY PROHIBITED. If you
> have received this transmission in error, please immediately notify the
> sender and delete the e-mail and attached documents. Thank you.
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Discriptency sample standard deviation pyspark and Excel

2023-09-19 Thread Bjørn Jørgensen
from pyspark.sql import SparkSession
from pyspark.sql.functions import stddev_samp, stddev_pop

spark = SparkSession.builder.getOrCreate()

data = [(52.7,), (45.3,), (60.2,), (53.8,), (49.1,), (44.6,), (58.0,),
(56.5,), (47.9,), (50.3,)]
df = spark.createDataFrame(data, ["value"])

df.select(stddev_samp("value").alias("sample_stddev")).show()

+-+
|sample_stddev|
+-+
|5.320025062597606|
+-+



In MS Excel 365 Norwegian

[image: image.png]


=STDAVVIKA(B1:B10)

=STDAV.S(B1:B10)

They both prints
5,32002506

 Which is the same as pyspark does.





tir. 19. sep. 2023 kl. 14:15 skrev Helene Bøe :

> Hi!
>
>
>
> I am applying the stddev function (so actually stddev_samp), however when
> comparing with the sample standard deviation in Excel the resuls do not
> match.
>
> I cannot find in your documentation any more specifics on how the sample
> standard deviation is calculated, so I cannot compare the difference toward
> excel, which uses
>
> .
>
> I am trying to avoid using Excel at all costs, but if the stddev_samp
> function is not calculating the standard deviation correctly I have a
> problem.
>
> I hope you can help me resolve this issue.
>
>
>
> Kindest regards,
>
>
>
> *Helene Bøe*
> *Graduate Project Engineer*
> Recycling Process & Support
>
> M: +47 980 00 887
> helene.b...@hydro.com
> 
>
> Norsk Hydro ASA
> Drammensveien 264
> NO-0283 Oslo, Norway
> www.hydro.com
> 
>
>
> NOTICE: This e-mail transmission, and any documents, files or previous
> e-mail messages attached to it, may contain confidential or privileged
> information. If you are not the intended recipient, or a person responsible
> for delivering it to the intended recipient, you are hereby notified that
> any disclosure, copying, distribution or use of any of the information
> contained in or attached to this message is STRICTLY PROHIBITED. If you
> have received this transmission in error, please immediately notify the
> sender and delete the e-mail and attached documents. Thank you.
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Create an external table with DataFrameWriterV2

2023-09-19 Thread Christophe Préaud

Hi,

I usually create an external Delta table with the command below, using 
DataFrameWriter API:


df.write
   .format("delta")
   .option("path", "")
   .saveAsTable("")

Now I would like to use the DataFrameWriterV2 API.
I have tried the following command:

df.writeTo("")
   .using("delta")
   .option("path", "")
   .createOrReplace()

but it creates a managed table, not an external one.

Can you tell me the correct syntax for creating an external table with 
DataFrameWriterV2 API?


Thanks,
Christophe.


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-19 Thread Sean Owen
Pyspark follows SQL databases here. stddev is stddev_samp, and sample
standard deviation is the calculation with the Bessel correction, n-1 in
the denominator. stddev_pop is simply standard deviation, with n in the
denominator.

On Tue, Sep 19, 2023 at 7:13 AM Helene Bøe 
wrote:

> Hi!
>
>
>
> I am applying the stddev function (so actually stddev_samp), however when
> comparing with the sample standard deviation in Excel the resuls do not
> match.
>
> I cannot find in your documentation any more specifics on how the sample
> standard deviation is calculated, so I cannot compare the difference toward
> excel, which uses
>
> .
>
> I am trying to avoid using Excel at all costs, but if the stddev_samp
> function is not calculating the standard deviation correctly I have a
> problem.
>
> I hope you can help me resolve this issue.
>
>
>
> Kindest regards,
>
>
>
> *Helene Bøe*
> *Graduate Project Engineer*
> Recycling Process & Support
>
> M: +47 980 00 887
> helene.b...@hydro.com
> 
>
> Norsk Hydro ASA
> Drammensveien 264
> NO-0283 Oslo, Norway
> www.hydro.com
> 
>
>
> NOTICE: This e-mail transmission, and any documents, files or previous
> e-mail messages attached to it, may contain confidential or privileged
> information. If you are not the intended recipient, or a person responsible
> for delivering it to the intended recipient, you are hereby notified that
> any disclosure, copying, distribution or use of any of the information
> contained in or attached to this message is STRICTLY PROHIBITED. If you
> have received this transmission in error, please immediately notify the
> sender and delete the e-mail and attached documents. Thank you.
>


Spark streaming sourceArchiveDir does not move file to archive directory

2023-09-19 Thread Yunus Emre G?rses
Hello everyone,

I'm using scala and spark with the version 3.4.1 in Windows 10. While streaming 
using Spark, I give the `cleanSource` option as "archive" and the 
`sourceArchiveDir` option as "archived" as in the code below.

```
spark.readStream
  .option("cleanSource", "archive")
  .option("sourceArchiveDir", "archived")
  .option("enforceSchema", false)
  .option("header", includeHeader)
  .option("inferSchema", inferSchema)
  .options(otherOptions)
  .schema(csvSchema.orNull)
  .csv(FileUtils.getPath(sourceSettings.dataFolderPath, 
mappingSource.path).toString)
```

The code ```FileUtils.getPath(sourceSettings.dataFolderPath, 
mappingSource.path)``` returns a relative path like: 
test-data\streaming-folder\patients

When I start stream, spark does not move source csv to archive folder. After 
working on it a bit, I started debugging the spark source codes. I found the 
```override protected def cleanTask(entry: FileEntry): Unit``` method in the 
`FileStreamSource.scala` file in the `org.apache.spark.sql.execution.streaming` 
package.
On line 569, the ```!fileSystem.rename(curPath, newPath)``` code supposed to 
move source file to archive folder. However, when I debugged, I noticed that 
the curPath and newPath values were as follows:

**curPath**: 
`file:/C:/dev/be/data-integration-suite/test-data/streaming-folder/patients/patients-success.csv`

**newPath**: 
`file:/C:/dev/be/data-integration-suite/archived/C:/dev/be/data-integration-suite/test-data/streaming-folder/patients/patients-success.csv`

It seems that absolute path of csv file were appended when creating `newPath` 
because there are two `C:/dev/be/data-integration-suite` in the newPath. This 
is the reason spark archiving does not work. Instead, newPath should be: 
`file:/C:/dev/be/data-integration-suite/archived/test-data/streaming-folder/patients/patients-success.csv`.
 I guess this is more related to spark library and maybe it's a spark related 
bug? Is there any workaround or spark config to overcome this problem?

Thanks
Best regards,
Yunus Emre


Discriptency sample standard deviation pyspark and Excel

2023-09-19 Thread Helene Bøe
Hi!

I am applying the stddev function (so actually stddev_samp), however when 
comparing with the sample standard deviation in Excel the resuls do not match.
I cannot find in your documentation any more specifics on how the sample 
standard deviation is calculated, so I cannot compare the difference toward 
excel, which uses
[cid:image003.png@01D9EAF8.AE708920].
I am trying to avoid using Excel at all costs, but if the stddev_samp function 
is not calculating the standard deviation correctly I have a problem.
I hope you can help me resolve this issue.

Kindest regards,

Helene Bøe
Graduate Project Engineer
Recycling Process & Support
M: +47 980 00 887
helene.b...@hydro.com

Norsk Hydro ASA
Drammensveien 264
NO-0283 Oslo, Norway
www.hydro.com

[cid:image004.png@01D9EAF8.AE708920]

NOTICE: This e-mail transmission, and any documents, files or previous e-mail 
messages attached to it, may contain confidential or privileged information. If 
you are not the intended recipient, or a person responsible for delivering it 
to the intended recipient, you are hereby notified that any disclosure, 
copying, distribution or use of any of the information contained in or attached 
to this message is STRICTLY PROHIBITED. If you have received this transmission 
in error, please immediately notify the sender and delete the e-mail and 
attached documents. Thank you.


Re: Spark stand-alone mode

2023-09-19 Thread Patrick Tucci
Multiple applications can run at once, but you need to either configure
Spark or your applications to allow that. In stand-alone mode, each
application attempts to take all resources available by default. This
section of the documentation has more details:

https://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling

Explicitly setting the resources per application limits the resources to
the configured values for the lifetime of the application. You can use
dynamic allocation to allow Spark to scale the resources up and down per
application based on load, but the configuration is relatively more complex:

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Mon, Sep 18, 2023 at 3:53 PM Ilango  wrote:

>
> Thanks all for your suggestions. Noted with thanks.
> Just wanted share few more details about the environment
> 1. We use NFS for data storage and data is in parquet format
> 2. All HPC nodes are connected and already work as a cluster for Studio
> workbench. I can setup password less SSH if it not exist already.
> 3. We will stick with NFS for now and stand alone then may be will explore
> HDFS and YARN.
>
> Can you please confirm whether multiple users can run spark jobs at the
> same time?
> If so I will start working on it and let you know how it goes
>
> Mich, the link to Hadoop is not working. Can you please check and let me
> know the correct link. Would like to explore Hadoop option as well.
>
>
>
> Thanks,
> Elango
>
> On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
> wrote:
>
>> you need to setup ssh without password, use key instead.  How to connect
>> without password using SSH (passwordless)
>> 
>>
>> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> Hi,
>>>
>>> Can these 4 nodes talk to each other through ssh as trusted hosts (on
>>> top of the network that Sean already mentioned)? Otherwise you need to set
>>> it up. You can install a LAN if you have another free port at the back of
>>> your HPC nodes. They should
>>>
>>> You ought to try to set up a Hadoop cluster pretty easily. Check this
>>> old article of mine for Hadoop set-up.
>>>
>>>
>>> https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D
>>>
>>> Hadoop will provide you with a common storage layer (HDFS) that these
>>> nodes will be able to share and talk. Yarn is your best bet as the resource
>>> manager with reasonably powerful hosts you have. However, for now the Stand
>>> Alone mode will do. Make sure that the Metastore you choose, (by default it
>>> will use Hive Metastore called Derby :( ) is something respetable like
>>> Postgres DB that can handle multiple concurrent spark jobs
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Distinguished Technologist, Solutions Architect & Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:
>>>

 Hi all,

 We have 4 HPC nodes and installed spark individually in all nodes.

 Spark is used as local mode(each driver/executor will have 8 cores and
 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
 scheduler.

 As this is local mode, we are facing performance issue(as only one
 executor) when it comes dealing with large datasets.

 Can I convert this 4 nodes into spark standalone cluster. We dont have
 hadoop so yarn mode is out of scope.

 Shall I follow the official documentation for setting up standalone
 cluster. Will it work? Do I need to aware anything else?
 Can you please share your thoughts?

 Thanks,
 Elango

>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-19 Thread Karthick
Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

Dear Spark Community,

I recently reached out to the Apache Flink community for assistance with a
critical issue we are facing in our IoT platform, which relies on Apache
Kafka and real-time data processing. We received some valuable insights and
suggestions from the Apache Flink community, and now, we would like to seek
your expertise and guidance on the same problem.

In our IoT ecosystem, we are dealing with data streams from thousands of
devices, each uniquely identified. To maintain data integrity and ordering,
we have configured a Kafka topic with ten partitions, ensuring that each
device's data is directed to its respective partition based on its unique
identifier. While this architectural choice has been effective in
maintaining data order, it has unveiled a significant challenge:

*Slow Consumer and Data Skew Problem:* When a single device experiences
processing delays, it acts as a bottleneck within the Kafka partition,
leading to delays in processing data from other devices sharing the same
partition. This issue severely affects the efficiency and scalability of
our entire data processing pipeline.

Here are some key details:

- Number of Devices: 1000 (with potential growth)
- Target Message Rate: 1000 messages per second (with expected growth)
- Kafka Partitions: 10 (some partitions are overloaded)
- We are planning to migrate from Apache Storm to Apache Flink/Spark.

We are actively seeking guidance on the following aspects:

*1. Independent Device Data Processing*: We require a strategy that
guarantees one device's processing speed does not affect other devices in
the same Kafka partition. In other words, we need a solution that ensures
the independent processing of each device's data.

*2. Custom Partitioning Strategy:* We are looking for a custom partitioning
strategy to distribute the load evenly across Kafka partitions. Currently,
we are using Murmur hashing with the device's unique identifier, but we are
open to exploring alternative partitioning strategies.

*3. Determining Kafka Partition Count:* We seek guidance on how to
determine the optimal number of Kafka partitions to handle the target
message rate efficiently.

*4. Handling Data Skew:* Strategies or techniques for handling data skew
within Apache Flink.

We believe that many in your community may have faced similar challenges or
possess valuable insights into addressing them. Your expertise and
experiences can greatly benefit our team and the broader community dealing
with real-time data processing.

If you have any knowledge, solutions, or references to open-source
projects, libraries, or community-contributed solutions that align with our
requirements, we would be immensely grateful for your input.

We appreciate your prompt attention to this matter and eagerly await your
responses and insights. Your support will be invaluable in helping us
overcome this critical challenge.

Thank you for your time and consideration.

Thanks & regards,
Karthick.


unsubscribe

2023-09-18 Thread Ghazi Naceur
unsubscribe


Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-18 Thread Jerry Peng
Hi Craig,

Thank you for sending us more information.  Can you answer my previous
question which I don't think the document addresses. How did you determine
duplicates in the output?  How was the output data read? The FileStreamSink
provides exactly-once writes ONLY if you read the output with the
FileStreamSource or the FileSource (batch).  A log is used to determine
what data is committed or not and those aforementioned sources know how to
use that log to read the data "exactly-once".  So there may be duplicated
data written on disk.  If you simply just read the data files written to
disk you may see duplicates when there are failures.  However, if you read
the output location with Spark you should get exactly once results (unless
there is a bug) since spark will know how to use the commit log to see what
data files are committed and not.

Best,

Jerry

On Mon, Sep 18, 2023 at 1:18 PM Craig Alfieri 
wrote:

> Hi Russell/Jerry/Mich,
>
>
>
> Appreciate your patience on this.
>
>
>
> Attached are more details on how this duplication “error” was found.
>
> Since we’re still unsure I am using “error” in quotes.
>
>
>
> We’d love the opportunity to work with any of you directly and/or the
> wider Spark community to triage this or get a better understanding of the
> nature of what we’re experiencing.
>
>
>
> Our platform provides the ability to fully reproduce this.
>
>
>
> Once you have had the chance to review the attached draft, let us know if
> there are any questions in the meantime. Again, we welcome the opportunity
> to work with the teams on this.
>
>
>
> Best-
>
> Craig
>
>
>
>
>
>
>
> *From: *Craig Alfieri 
> *Date: *Thursday, September 14, 2023 at 8:45 PM
> *To: *russell.spit...@gmail.com 
> *Cc: *Jerry Peng , Mich Talebzadeh <
> mich.talebza...@gmail.com>, user@spark.apache.org ,
> connor.mc...@antithesis.com 
> *Subject: *Re: Data Duplication Bug Found - Structured Streaming Versions
> 3..4.1, 3.2.4, and 3.3.2
>
> Hi Russell et al,
>
>
>
> Acknowledging receipt; we’ll get these answers back to the group.
>
>
>
> Follow-up forthcoming.
>
>
>
> Craig
>
>
>
>
>
>
>
> On Sep 14, 2023, at 6:38 PM, russell.spit...@gmail.com wrote:
>
> Exactly once should be output sink dependent, what sink was being used?
>
> Sent from my iPhone
>
>
>
> On Sep 14, 2023, at 4:52 PM, Jerry Peng 
> wrote:
>
> 
>
> Craig,
>
>
>
> Thanks! Please let us know the result!
>
>
>
> Best,
>
>
>
> Jerry
>
>
>
> On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
> Hi Craig,
>
>
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Distinguished Technologist, Solutions Architect & Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
> Hello Spark Community-
>
>
>
> As part of a research effort, our team here at Antithesis tests for
> correctness/fault tolerance of major OSS projects.
>
> Our team recently was testing Spark’s Structured Streaming, and we came
> across a data duplication bug we’d like to work with the teams on to
> resolve.
>
>
>
> Our intention is to utilize this as a future case study for our platform,
> but prior to doing so we like to have a resolution in place so that an
> announcement isn’t alarming to the user base.
>
>
>
> Attached is a high level .pdf that reviews the High Availability set-up
> put under test.
>
> This was also tested across the three latest versions, and the same
> behavior was observed.
>
>
>
> We can reproduce this error readily, since our environment is fully
> deterministic, we are just not Spark experts and would like to work with
> someone in the community to resolve this.
>
>
>
> Please let us know at your earliest convenience.
>
>
>
> Best
>
>
>
> Error! Filename not specified.
>
> *Craig Alfieri*
>
> c: 917.841.1652
>
> craig.alfi...@antithesis.com
>
> New York, NY.
>
> Antithesis.com
> 
>
>
>
> We can't talk about most of the bugs that we've found for our customers,
>
> but some customers like to speak about their work with us:
>
> https://github.com/mongodb/mongo/wiki/Testing-MongoDB-with-Antithesis
>
>
>
>
>
>
>
> 

Re: getting emails in different order!

2023-09-18 Thread Mich Talebzadeh
OK thanks Sean. Not a big issue for me. Normally happens AM GMT/London
time.. I see the email trail but not the thread owner's email first.
Normally responses first.


Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 18 Sept 2023 at 17:16, Sean Owen  wrote:

> I have seen this, and not sure if it's just the ASF mailer being weird, or
> more likely, because emails are moderated and we inadvertently moderate
> them out of order
>
> On Mon, Sep 18, 2023 at 10:59 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I use gmail to receive spark user group emails.
>>
>> On occasions, I get the latest emails first and later in the day I
>> receive the original email.
>>
>> Has anyone else seen this behaviour recently?
>>
>> Thanks
>>
>> Mich Talebzadeh,
>> Distinguished Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: getting emails in different order!

2023-09-18 Thread Sean Owen
I have seen this, and not sure if it's just the ASF mailer being weird, or
more likely, because emails are moderated and we inadvertently moderate
them out of order

On Mon, Sep 18, 2023 at 10:59 AM Mich Talebzadeh 
wrote:

> Hi,
>
> I use gmail to receive spark user group emails.
>
> On occasions, I get the latest emails first and later in the day I receive
> the original email.
>
> Has anyone else seen this behaviour recently?
>
> Thanks
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark stand-alone mode

2023-09-18 Thread Ilango
Thanks all for your suggestions. Noted with thanks.
Just wanted share few more details about the environment
1. We use NFS for data storage and data is in parquet format
2. All HPC nodes are connected and already work as a cluster for Studio
workbench. I can setup password less SSH if it not exist already.
3. We will stick with NFS for now and stand alone then may be will explore
HDFS and YARN.

Can you please confirm whether multiple users can run spark jobs at the
same time?
If so I will start working on it and let you know how it goes

Mich, the link to Hadoop is not working. Can you please check and let me
know the correct link. Would like to explore Hadoop option as well.



Thanks,
Elango

On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
wrote:

> you need to setup ssh without password, use key instead.  How to connect
> without password using SSH (passwordless)
> 
>
> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Hi,
>>
>> Can these 4 nodes talk to each other through ssh as trusted hosts (on top
>> of the network that Sean already mentioned)? Otherwise you need to set it
>> up. You can install a LAN if you have another free port at the back of your
>> HPC nodes. They should
>>
>> You ought to try to set up a Hadoop cluster pretty easily. Check this old
>> article of mine for Hadoop set-up.
>>
>>
>> https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D
>>
>> Hadoop will provide you with a common storage layer (HDFS) that these
>> nodes will be able to share and talk. Yarn is your best bet as the resource
>> manager with reasonably powerful hosts you have. However, for now the Stand
>> Alone mode will do. Make sure that the Metastore you choose, (by default it
>> will use Hive Metastore called Derby :( ) is something respetable like
>> Postgres DB that can handle multiple concurrent spark jobs
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Distinguished Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:
>>
>>>
>>> Hi all,
>>>
>>> We have 4 HPC nodes and installed spark individually in all nodes.
>>>
>>> Spark is used as local mode(each driver/executor will have 8 cores and
>>> 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
>>> scheduler.
>>>
>>> As this is local mode, we are facing performance issue(as only one
>>> executor) when it comes dealing with large datasets.
>>>
>>> Can I convert this 4 nodes into spark standalone cluster. We dont have
>>> hadoop so yarn mode is out of scope.
>>>
>>> Shall I follow the official documentation for setting up standalone
>>> cluster. Will it work? Do I need to aware anything else?
>>> Can you please share your thoughts?
>>>
>>> Thanks,
>>> Elango
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


getting emails in different order!

2023-09-18 Thread Mich Talebzadeh
Hi,

I use gmail to receive spark user group emails.

On occasions, I get the latest emails first and later in the day I receive
the original email.

Has anyone else seen this behaviour recently?

Thanks

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


[ANNOUNCE] Apache Kyuubi released 1.7.2

2023-09-18 Thread Zhen Wang
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.7.2 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.7.2.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
On behalf of Apache Kyuubi community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: First Time contribution.

2023-09-17 Thread Haejoon Lee
Welcome Ram! :-)

I would recommend you to check
https://issues.apache.org/jira/browse/SPARK-37935 out as a starter task.

Refer to https://github.com/apache/spark/pull/41504,
https://github.com/apache/spark/pull/41455 as an example PR.

Or you can also add a new sub-task if you find any error messages that need
improvement.

Thanks!

On Mon, Sep 18, 2023 at 9:33 AM Denny Lee  wrote:

> Hi Ram,
>
> We have some good guidance at
> https://spark.apache.org/contributing.html
>
> HTH!
> Denny
>
>
> On Sun, Sep 17, 2023 at 17:18 ram manickam  wrote:
>
>>
>>
>>
>> Hello All,
>> Recently, joined this community and would like to contribute. Is there a
>> guideline or recommendation on tasks that can be picked up by a first timer
>> or a started task?.
>>
>> Tried looking at stack overflow tag: apache-spark
>> , couldn't find
>> any information for first time contributors.
>>
>> Looking forward to learning and contributing.
>>
>> Thanks
>> Ram
>>
>


Re: First Time contribution.

2023-09-17 Thread Denny Lee
Hi Ram,

We have some good guidance at
https://spark.apache.org/contributing.html

HTH!
Denny


On Sun, Sep 17, 2023 at 17:18 ram manickam  wrote:

>
>
>
> Hello All,
> Recently, joined this community and would like to contribute. Is there a
> guideline or recommendation on tasks that can be picked up by a first timer
> or a started task?.
>
> Tried looking at stack overflow tag: apache-spark
> , couldn't find
> any information for first time contributors.
>
> Looking forward to learning and contributing.
>
> Thanks
> Ram
>


About Peak Jvm Memory Onheap

2023-09-17 Thread Nebi Aydin
Hi all,
I couldn't find any useful doc that explains `*Peak JVM Memory Onheap`*
field on Spark UI.
Most of the time my applications have very low *On heap storage memory
*and *Peak
execution memory on heap*
But have very big `*Peak JVM Memory Onheap`.* on Spark UI
Can someone please explain the diff between these metrics?


Fwd: First Time contribution.

2023-09-17 Thread ram manickam
Hello All,
Recently, joined this community and would like to contribute. Is there a
guideline or recommendation on tasks that can be picked up by a first timer
or a started task?.

Tried looking at stack overflow tag: apache-spark
, couldn't find
any information for first time contributors.

Looking forward to learning and contributing.

Thanks
Ram


Re: Filter out 20% of rows

2023-09-16 Thread ashok34...@yahoo.com.INVALID
 Thank you Bjorn and Mich. 
Appreciated
Best
On Saturday, 16 September 2023 at 16:50:04 BST, Mich Talebzadeh 
 wrote:  
 
 Hi Bjorn,
I thought that one is better off using percentile_approx as it seems to be the 
recommended approach for computing percentiles and can simplify the code. I 
have modified your code to use percentile_approx rather than manually computing 
it. It would be interesting to hear ideas on this.
Here is the code:
# Standard library importsimport jsonimport multiprocessingimport osimport 
reimport sysimport random
# Third-party importsimport numpy as npimport pandas as pdimport pyarrow
# Pyspark importsfrom pyspark import SparkConf, SparkContextfrom pyspark.sql 
import SparkSession, functions as F, Windowfrom pyspark.sql.functions import (  
  col, concat, concat_ws, expr, lit, trim, udf)from pyspark.sql.types import (  
  IntegerType, StringType, StructField, StructType,    DoubleType, 
TimestampType)from pyspark import pandas as ps
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
number_cores = int(multiprocessing.cpu_count())
mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 
4015976448memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74

def get_spark_session(app_name: str, conf: SparkConf):    
conf.setMaster("local[{}]".format(number_cores))    
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(        
"spark.sql.repl.eagerEval.enabled", "True"    
).set("spark.sql.adaptive.enabled", "True").set(        "spark.serializer", 
"org.apache.spark.serializer.KryoSerializer"    ).set(        
"spark.sql.repl.eagerEval.maxNumRows", "1"    )
    return 
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

spark = get_spark_session("My super app", SparkConf())sc = 
SparkContext.getOrCreate()sc.setLogLevel("ERROR")
def generate_ip():    return ".".join(str(random.randint(0, 255)) for _ in 
range(4))
def generate_timestamp():    return pd.Timestamp(        
year=random.randint(2021, 2023),        month=random.randint(1, 12),        
day=random.randint(1, 28),        hour=random.randint(0, 23),        
minute=random.randint(0, 59),        second=random.randint(0, 59)    )
def random_gbps():    return random.uniform(0, 10)
# Number of rowsn = 20
data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": 
generate_timestamp()} for _ in range(n)]df = 
spark.createDataFrame(pd.DataFrame(data))df.show()
agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
windowRank = Window.orderBy(F.col("total_gbps").desc())agg_df = 
agg_df.withColumn("rank", F.percent_rank().over(windowRank))
# Calculate the 80th percentile valuepercentile_80 = 
agg_df.agg(F.expr("percentile_approx(total_gbps, 
0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
# Filter the DataFrame based on the conditionfiltered_df = df.filter(df["gbps"] 
>= percentile_80)
# Show the filtered DataFrameprint(f"Filtered DataFrame")filtered_df.show()
print(f"Total rows in data frame = {df.count()}")print(f"Result satisfying 80% 
percentile = {filtered_df.count()}")
And this is the results

+---+--+---+|   incoming_ips|       
       gbps|          
date_time|+---+--+---+|129.189.130.141|2.6517421918102335|2021-09-06
 08:29:25|| 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13||  
78.202.71.184| 8.060958370556456|2022-02-22 
04:25:03||219.100.198.137|0.3449002002472945|2023-09-28 
01:39:44||234.234.156.107|2.6187481766507013|2022-11-16 11:33:41||  
6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|| 
147.118.171.59|6.4071750880652765|2023-10-08 16:49:10||  
75.41.101.165|2.1484984272041685|2022-07-13 21:02:58||  163.26.238.22|   
9.8999646499433|2023-01-12 17:54:44|| 
184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|| 125.77.236.177|  
1.17126350326476|2021-08-19 18:48:42||  34.103.211.39|  
9.51081430594299|2023-02-05 18:39:23||   117.37.42.91| 
1.122437784309721|2021-03-23 17:27:27|| 108.115.42.171| 
8.165187506266607|2023-07-26 03:57:50|| 98.105.153.129| 
9.284242190156004|2023-10-10 22:36:47|| 145.35.252.142| 
9.787384042283957|2022-08-26 00:53:27||  18.76.138.108| 
6.939770760444909|2022-04-01 01:18:27||    31.33.71.26| 
4.820947188427366|2021-06-10 22:02:51||    135.22.8.38| 
9.587849542001745|2021-09-21 15:11:59||104.231.110.207| 
9.045897927807715|2023-06-28 
06:01:00|+---+--+---+
Filtered DataFrame+--+-+---+|  
incoming_ips|             gbps|          
date_time|+--+-+---+| 
163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|| 34.103.211.39| 
9.51081430594299|2023-02-05 
18:39:23||98.105.153.129|9.284242190156004|2023-10-10 
22:36:47||145.35.252.142|9.787384042283957|2022-08-26 00:53:27||   
135.22.8.38|9.587849542001745|2021-09-21 
15:11:59|+--+-+---+

Re: Filter out 20% of rows

2023-09-16 Thread Bjørn Jørgensen
EDIT:
I don't think that the question asker will have only returned the top 25
percentages.

lør. 16. sep. 2023 kl. 21:54 skrev Bjørn Jørgensen :

> percentile_approx returns the approximate percentile(s)
>  The memory consumption is
> bounded. The larger accuracy parameter we choose, the smaller error we get.
> The default accuracy value is 1, to match with Hive default setting.
> Choose a smaller value for a smaller memory footprint.
>
> When I run my code on a single PC where N = 10 millions it takes 22.52
> seconds. Notebook added.
>
> I don't think that the question asker will have only returned the top
> 20 percentages.
>
>
> lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Hi Bjorn,
>>
>> I thought that one is better off using percentile_approx as it seems to
>> be the recommended approach for computing percentiles and can simplify the
>> code.
>> I have modified your code to use percentile_approx rather than manually
>> computing it. It would be interesting to hear ideas on this.
>>
>> Here is the code:
>>
>> # Standard library imports
>> import json
>> import multiprocessing
>> import os
>> import re
>> import sys
>> import random
>>
>> # Third-party imports
>> import numpy as np
>> import pandas as pd
>> import pyarrow
>>
>> # Pyspark imports
>> from pyspark import SparkConf, SparkContext
>> from pyspark.sql import SparkSession, functions as F, Window
>> from pyspark.sql.functions import (
>> col, concat, concat_ws, expr, lit, trim, udf
>> )
>> from pyspark.sql.types import (
>> IntegerType, StringType, StructField, StructType,
>> DoubleType, TimestampType
>> )
>> from pyspark import pandas as ps
>>
>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
>> e.g. 4015976448
>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>> conf.setMaster("local[{}]".format(number_cores))
>> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>> "spark.sql.repl.eagerEval.enabled", "True"
>> ).set("spark.sql.adaptive.enabled", "True").set(
>> "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>> ).set(
>> "spark.sql.repl.eagerEval.maxNumRows", "1"
>> )
>>
>> return
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>>
>> spark = get_spark_session("My super app", SparkConf())
>> sc = SparkContext.getOrCreate()
>> sc.setLogLevel("ERROR")
>>
>> def generate_ip():
>> return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>
>> def generate_timestamp():
>> return pd.Timestamp(
>> year=random.randint(2021, 2023),
>> month=random.randint(1, 12),
>> day=random.randint(1, 28),
>> hour=random.randint(0, 23),
>> minute=random.randint(0, 59),
>> second=random.randint(0, 59)
>> )
>>
>> def random_gbps():
>> return random.uniform(0, 10)
>>
>> # Number of rows
>> n = 20
>>
>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
>> "date_time": generate_timestamp()} for _ in range(n)]
>> df = spark.createDataFrame(pd.DataFrame(data))
>> df.show()
>>
>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>
>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>
>> # Calculate the 80th percentile value
>> percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
>> 0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
>>
>> # Filter the DataFrame based on the condition
>> filtered_df = df.filter(df["gbps"] >= percentile_80)
>>
>> # Show the filtered DataFrame
>> print(f"Filtered DataFrame")
>> filtered_df.show()
>>
>> print(f"Total rows in data frame = {df.count()}")
>> print(f"Result satisfying 80% percentile = {filtered_df.count()}")
>>
>> And this is the results
>>
>> +---+--+---+
>> |   incoming_ips|  gbps|  date_time|
>> +---+--+---+
>> |129.189.130.141|2.6517421918102335|2021-09-06 08:29:25|
>> | 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13|
>> |  78.202.71.184| 8.060958370556456|2022-02-22 04:25:03|
>> |219.100.198.137|0.3449002002472945|2023-09-28 01:39:44|
>> |234.234.156.107|2.6187481766507013|2022-11-16 11:33:41|
>> |  6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|
>> | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
>> |  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
>> |  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
>> | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
>> | 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
>> |  34.103.211.39|  9.51081430594299|2023-02-05 

Re: Filter out 20% of rows

2023-09-16 Thread Bjørn Jørgensen
percentile_approx returns the approximate percentile(s)
 The memory consumption is
bounded. The larger accuracy parameter we choose, the smaller error we get.
The default accuracy value is 1, to match with Hive default setting.
Choose a smaller value for a smaller memory footprint.

When I run my code on a single PC where N = 10 millions it takes 22.52
seconds. Notebook added.

I don't think that the question asker will have only returned the top
20 percentages.


lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Hi Bjorn,
>
> I thought that one is better off using percentile_approx as it seems to be
> the recommended approach for computing percentiles and can simplify the
> code.
> I have modified your code to use percentile_approx rather than manually
> computing it. It would be interesting to hear ideas on this.
>
> Here is the code:
>
> # Standard library imports
> import json
> import multiprocessing
> import os
> import re
> import sys
> import random
>
> # Third-party imports
> import numpy as np
> import pandas as pd
> import pyarrow
>
> # Pyspark imports
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession, functions as F, Window
> from pyspark.sql.functions import (
> col, concat, concat_ws, expr, lit, trim, udf
> )
> from pyspark.sql.types import (
> IntegerType, StringType, StructField, StructType,
> DoubleType, TimestampType
> )
> from pyspark import pandas as ps
>
> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
> e.g. 4015976448
> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster("local[{}]".format(number_cores))
> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
> "spark.sql.repl.eagerEval.enabled", "True"
> ).set("spark.sql.adaptive.enabled", "True").set(
> "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
> ).set(
> "spark.sql.repl.eagerEval.maxNumRows", "1"
> )
>
> return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My super app", SparkConf())
> sc = SparkContext.getOrCreate()
> sc.setLogLevel("ERROR")
>
> def generate_ip():
> return ".".join(str(random.randint(0, 255)) for _ in range(4))
>
> def generate_timestamp():
> return pd.Timestamp(
> year=random.randint(2021, 2023),
> month=random.randint(1, 12),
> day=random.randint(1, 28),
> hour=random.randint(0, 23),
> minute=random.randint(0, 59),
> second=random.randint(0, 59)
> )
>
> def random_gbps():
> return random.uniform(0, 10)
>
> # Number of rows
> n = 20
>
> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
> "date_time": generate_timestamp()} for _ in range(n)]
> df = spark.createDataFrame(pd.DataFrame(data))
> df.show()
>
> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>
> windowRank = Window.orderBy(F.col("total_gbps").desc())
> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>
> # Calculate the 80th percentile value
> percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
> 0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
>
> # Filter the DataFrame based on the condition
> filtered_df = df.filter(df["gbps"] >= percentile_80)
>
> # Show the filtered DataFrame
> print(f"Filtered DataFrame")
> filtered_df.show()
>
> print(f"Total rows in data frame = {df.count()}")
> print(f"Result satisfying 80% percentile = {filtered_df.count()}")
>
> And this is the results
>
> +---+--+---+
> |   incoming_ips|  gbps|  date_time|
> +---+--+---+
> |129.189.130.141|2.6517421918102335|2021-09-06 08:29:25|
> | 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13|
> |  78.202.71.184| 8.060958370556456|2022-02-22 04:25:03|
> |219.100.198.137|0.3449002002472945|2023-09-28 01:39:44|
> |234.234.156.107|2.6187481766507013|2022-11-16 11:33:41|
> |  6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|
> | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
> |  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
> |  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
> | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
> | 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
> |  34.103.211.39|  9.51081430594299|2023-02-05 18:39:23|
> |   117.37.42.91| 1.122437784309721|2021-03-23 17:27:27|
> | 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50|
> | 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47|
> | 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27|
> |  18.76.138.108| 6.939770760444909|2022-04-01 

Re: Filter out 20% of rows

2023-09-16 Thread Mich Talebzadeh
Hi Bjorn,

I thought that one is better off using percentile_approx as it seems to be
the recommended approach for computing percentiles and can simplify the
code.
I have modified your code to use percentile_approx rather than manually
computing it. It would be interesting to hear ideas on this.

Here is the code:

# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
IntegerType, StringType, StructField, StructType,
DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster("local[{}]".format(number_cores))
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.repl.eagerEval.enabled", "True"
).set("spark.sql.adaptive.enabled", "True").set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "1"
)

return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My super app", SparkConf())
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

def generate_ip():
return ".".join(str(random.randint(0, 255)) for _ in range(4))

def generate_timestamp():
return pd.Timestamp(
year=random.randint(2021, 2023),
month=random.randint(1, 12),
day=random.randint(1, 28),
hour=random.randint(0, 23),
minute=random.randint(0, 59),
second=random.randint(0, 59)
)

def random_gbps():
return random.uniform(0, 10)

# Number of rows
n = 20

data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time":
generate_timestamp()} for _ in range(n)]
df = spark.createDataFrame(pd.DataFrame(data))
df.show()

agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))

windowRank = Window.orderBy(F.col("total_gbps").desc())
agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))

# Calculate the 80th percentile value
percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
0.8)").alias("percentile_80")).collect()[0]["percentile_80"]

# Filter the DataFrame based on the condition
filtered_df = df.filter(df["gbps"] >= percentile_80)

# Show the filtered DataFrame
print(f"Filtered DataFrame")
filtered_df.show()

print(f"Total rows in data frame = {df.count()}")
print(f"Result satisfying 80% percentile = {filtered_df.count()}")

And this is the results

+---+--+---+
|   incoming_ips|  gbps|  date_time|
+---+--+---+
|129.189.130.141|2.6517421918102335|2021-09-06 08:29:25|
| 215.177.39.239|1.8210013026429361|2023-10-10 17:00:13|
|  78.202.71.184| 8.060958370556456|2022-02-22 04:25:03|
|219.100.198.137|0.3449002002472945|2023-09-28 01:39:44|
|234.234.156.107|2.6187481766507013|2022-11-16 11:33:41|
|  6.223.135.194|0.3510752223686242|2022-01-24 04:13:53|
| 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
|  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
|  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
| 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
| 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
|  34.103.211.39|  9.51081430594299|2023-02-05 18:39:23|
|   117.37.42.91| 1.122437784309721|2021-03-23 17:27:27|
| 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50|
| 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47|
| 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27|
|  18.76.138.108| 6.939770760444909|2022-04-01 01:18:27|
|31.33.71.26| 4.820947188427366|2021-06-10 22:02:51|
|135.22.8.38| 9.587849542001745|2021-09-21 15:11:59|
|104.231.110.207| 9.045897927807715|2023-06-28 06:01:00|
+---+--+---+

Filtered DataFrame
+--+-+---+
|  incoming_ips| gbps|  date_time|
+--+-+---+
| 163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|
| 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23|
|98.105.153.129|9.284242190156004|2023-10-10 22:36:47|
|145.35.252.142|9.787384042283957|2022-08-26 00:53:27|
|   135.22.8.38|9.587849542001745|2021-09-21 15:11:59|
+--+-+---+

Total rows in data frame = 20
Result satisfying 80% percentile = 5

Cheers
Mich 

<    2   3   4   5   6   7   8   9   10   11   >