Re: External Spark shuffle service for k8s

2024-04-07 Thread Enrico Minack

There is Apache incubator project Uniffle:
https://github.com/apache/incubator-uniffle

It stores shuffle data on remote servers in memory, on local disk and HDFS.

Cheers,
Enrico


Am 06.04.24 um 15:41 schrieb Mich Talebzadeh:

I have seen some older references for shuffle service for k8s,
although it is not clear they are talking about a generic shuffle
service for k8s.

Anyhow with the advent of genai and the need to allow for a larger
volume of data, I was wondering if there has been any more work on
this matter. Specifically larger and scalable file systems like HDFS,
GCS , S3 etc, offer significantly larger storage capacity than local
disks on individual worker nodes in a k8s cluster, thus allowing
handling much larger datasets more efficiently. Also the degree of
parallelism and fault tolerance  with these files systems come into
it. I will be interested in hearing more about any progress on this.

Thanks
.

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


view my Linkedin profile


  https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark UDAF in examples fail with not serializable error

2024-04-07 Thread Owen Bell
The type-safe example given at
https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html
fails with a not serializable exception

Is this a known issue?


Re: Idiomatic way to rate-limit streaming sources to avoid OutOfMemoryError?

2024-04-07 Thread Mich Talebzadeh
OK,

This is a common issue in Spark Structured Streaming (SSS), where the
source generates data faster than Spark can process it. SSS doesn't have a
built-in mechanism for directly rate-limiting the incoming data stream
itself. However, consider the following:


   - Limit the rate at which data is produced. This can involve configuring
   the data source itself to emit data at a controlled rate or implementing
   rate limiting mechanisms in the application or system that produces the
   data.
   - SSS supports backpressure, which allows it to dynamically adjust the
   ingestion rate based on the processing capacity of the system. This can
   help prevent overwhelming the system with data. To enable backpressure, set
   the appropriate configuration properties such as
spark.conf.set("spark.streaming.backpressure.enabled",
   "true") and spark.streaming.backpressure.initialRate.
   - Consider adjusting the micro-batch interval to control the rate at
   which data is processed. Increasing the micro-batch interval and reduce the
   frequency of processing, allowing more time for each batch to be processed
   and reducing the likelihood of out-of-memory
   errors.. spark.conf.set("spark.sql.streaming.trigger.interval", "
   seconds"
   -  Dynamic Resource Allocation (DRA), Not implemented yet. DRA will
   automatically adjust allocated resources based on workload. This ensures
   Spark has enough resources to process incoming data within the trigger
   interval, preventing backlogs and potential OOM issues.


>From Spark UI, look at the streaming tab. There are various statistics
there. In general your Processing Time has to be less than your batch
interval. The scheduling Delay and Total Delay are additional indicator of
health.

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 7 Apr 2024 at 15:11, Baran, Mert  wrote:

> Hi Spark community,
>
> I have a Spark Structured Streaming application that reads data from a
> socket source (implemented very similarly to the
> TextSocketMicroBatchStream). The issue is that the source can generate
> data faster than Spark can process it, eventually leading to an
> OutOfMemoryError when Spark runs out of memory trying to queue up all
> the pending data.
>
> I'm looking for advice on the most idiomatic/recommended way in Spark to
> rate-limit data ingestion to avoid overwhelming the system.
>
> Approaches I've considered:
>
> 1. Using a BlockingQueue with a fixed size to throttle the data.
> However, this requires careful tuning of the queue size. If too small,
> it limits throughput; if too large, you risk batches taking too long.
>
> 2. Fetching a limited number of records in the PartitionReader's next(),
> adding the records into a queue and checking if the queue is empty.
> However, I'm not sure if there is a built-in way to dynamically scale
> the number of records fetched (i.e., dynamically calculating the offset)
> based on the system load and capabilities.
>
> So in summary, what is the recommended way to dynamically rate-limit a
> streaming source to match Spark's processing capacity and avoid
> out-of-memory issues? Are there any best practices or configuration
> options I should look at?
> Any guidance would be much appreciated! Let me know if you need any
> other details.
>
> Thanks,
> Mert
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: External Spark shuffle service for k8s

2024-04-07 Thread Mich Talebzadeh
Thanks Cheng for the heads up. I will have a look.

Cheers

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 7 Apr 2024 at 15:08, Cheng Pan  wrote:

> Instead of External Shuffle Shufle, Apache Celeborn might be a good option
> as a Remote Shuffle Service for Spark on K8s.
>
> There are some useful resources you might be interested in.
>
> [1] https://celeborn.apache.org/
> [2] https://www.youtube.com/watch?v=s5xOtG6Venw
> [3] https://github.com/aws-samples/emr-remote-shuffle-service
> [4] https://github.com/apache/celeborn/issues/2140
>
> Thanks,
> Cheng Pan
>
>
> > On Apr 6, 2024, at 21:41, Mich Talebzadeh 
> wrote:
> >
> > I have seen some older references for shuffle service for k8s,
> > although it is not clear they are talking about a generic shuffle
> > service for k8s.
> >
> > Anyhow with the advent of genai and the need to allow for a larger
> > volume of data, I was wondering if there has been any more work on
> > this matter. Specifically larger and scalable file systems like HDFS,
> > GCS , S3 etc, offer significantly larger storage capacity than local
> > disks on individual worker nodes in a k8s cluster, thus allowing
> > handling much larger datasets more efficiently. Also the degree of
> > parallelism and fault tolerance  with these files systems come into
> > it. I will be interested in hearing more about any progress on this.
> >
> > Thanks
> > .
> >
> > Mich Talebzadeh,
> >
> > Technologist | Solutions Architect | Data Engineer  | Generative AI
> >
> > London
> > United Kingdom
> >
> >
> >   view my Linkedin profile
> >
> >
> > https://en.everybodywiki.com/Mich_Talebzadeh
> >
> >
> >
> > Disclaimer: The information provided is correct to the best of my
> > knowledge but of course cannot be guaranteed . It is essential to note
> > that, as with any advice, quote "one test result is worth one-thousand
> > expert opinions (Werner Von Braun)".
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
>


Re: External Spark shuffle service for k8s

2024-04-07 Thread Vakaris Baškirov
There is an IBM shuffle service plugin that supports S3
https://github.com/IBM/spark-s3-shuffle

Though I would think a feature like this could be a part of the main Spark
repo. Trino already has out-of-box support for s3 exchange (shuffle) and
it's very useful.

Vakaris

On Sun, Apr 7, 2024 at 12:27 PM Mich Talebzadeh 
wrote:

>
> Thanks for your suggestion that I take it as a workaround. Whilst this
> workaround can potentially address storage allocation issues, I was more
> interested in exploring solutions that offer a more seamless integration
> with large distributed file systems like HDFS, GCS, or S3. This would
> ensure better performance and scalability for handling larger datasets
> efficiently.
>
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Sat, 6 Apr 2024 at 21:28, Bjørn Jørgensen 
> wrote:
>
>> You can make a PVC on K8S call it 300GB
>>
>> make a folder in yours dockerfile
>> WORKDIR /opt/spark/work-dir
>> RUN chmod g+w /opt/spark/work-dir
>>
>> start spark with adding this
>>
>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName",
>> "300gb") \
>>
>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path",
>> "/opt/spark/work-dir") \
>>
>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly",
>> "False") \
>>
>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName",
>> "300gb") \
>>
>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.path",
>> "/opt/spark/work-dir") \
>>
>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.readOnly",
>> "False") \
>>   .config("spark.local.dir", "/opt/spark/work-dir")
>>
>>
>>
>>
>> lør. 6. apr. 2024 kl. 15:45 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> I have seen some older references for shuffle service for k8s,
>>> although it is not clear they are talking about a generic shuffle
>>> service for k8s.
>>>
>>> Anyhow with the advent of genai and the need to allow for a larger
>>> volume of data, I was wondering if there has been any more work on
>>> this matter. Specifically larger and scalable file systems like HDFS,
>>> GCS , S3 etc, offer significantly larger storage capacity than local
>>> disks on individual worker nodes in a k8s cluster, thus allowing
>>> handling much larger datasets more efficiently. Also the degree of
>>> parallelism and fault tolerance  with these files systems come into
>>> it. I will be interested in hearing more about any progress on this.
>>>
>>> Thanks
>>> .
>>>
>>> Mich Talebzadeh,
>>>
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>>
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> Disclaimer: The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner Von Braun)".
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Idiomatic way to rate-limit streaming sources to avoid OutOfMemoryError?

2024-04-07 Thread Baran, Mert

Hi Spark community,

I have a Spark Structured Streaming application that reads data from a 
socket source (implemented very similarly to the 
TextSocketMicroBatchStream). The issue is that the source can generate 
data faster than Spark can process it, eventually leading to an 
OutOfMemoryError when Spark runs out of memory trying to queue up all 
the pending data.


I'm looking for advice on the most idiomatic/recommended way in Spark to 
rate-limit data ingestion to avoid overwhelming the system.


Approaches I've considered:

1. Using a BlockingQueue with a fixed size to throttle the data. 
However, this requires careful tuning of the queue size. If too small, 
it limits throughput; if too large, you risk batches taking too long.


2. Fetching a limited number of records in the PartitionReader's next(), 
adding the records into a queue and checking if the queue is empty. 
However, I'm not sure if there is a built-in way to dynamically scale 
the number of records fetched (i.e., dynamically calculating the offset) 
based on the system load and capabilities.


So in summary, what is the recommended way to dynamically rate-limit a 
streaming source to match Spark's processing capacity and avoid 
out-of-memory issues? Are there any best practices or configuration 
options I should look at?
Any guidance would be much appreciated! Let me know if you need any 
other details.


Thanks,
Mert


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: External Spark shuffle service for k8s

2024-04-07 Thread Cheng Pan
Instead of External Shuffle Shufle, Apache Celeborn might be a good option as a 
Remote Shuffle Service for Spark on K8s.

There are some useful resources you might be interested in.

[1] https://celeborn.apache.org/
[2] https://www.youtube.com/watch?v=s5xOtG6Venw
[3] https://github.com/aws-samples/emr-remote-shuffle-service
[4] https://github.com/apache/celeborn/issues/2140

Thanks,
Cheng Pan


> On Apr 6, 2024, at 21:41, Mich Talebzadeh  wrote:
> 
> I have seen some older references for shuffle service for k8s,
> although it is not clear they are talking about a generic shuffle
> service for k8s.
> 
> Anyhow with the advent of genai and the need to allow for a larger
> volume of data, I was wondering if there has been any more work on
> this matter. Specifically larger and scalable file systems like HDFS,
> GCS , S3 etc, offer significantly larger storage capacity than local
> disks on individual worker nodes in a k8s cluster, thus allowing
> handling much larger datasets more efficiently. Also the degree of
> parallelism and fault tolerance  with these files systems come into
> it. I will be interested in hearing more about any progress on this.
> 
> Thanks
> .
> 
> Mich Talebzadeh,
> 
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> 
> London
> United Kingdom
> 
> 
>   view my Linkedin profile
> 
> 
> https://en.everybodywiki.com/Mich_Talebzadeh
> 
> 
> 
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: External Spark shuffle service for k8s

2024-04-07 Thread Mich Talebzadeh
Splendid

The configurations below can be used with k8s deployments of Spark. Spark
applications running on k8s can utilize these configurations to seamlessly
access data stored in Google Cloud Storage (GCS) and Amazon S3.

For Google GCS we may have

spark_config_gcs = {
"spark.kubernetes.authenticate.driver.serviceAccountName":
"service_account_name",
"spark.hadoop.fs.gs.impl":
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.hadoop.google.cloud.auth.service.account.enable": "true",
"spark.hadoop.google.cloud.auth.service.account.json.keyfile":
"/path/to/keyfile.json",
}

For Amazon S3 similar

spark_config_s3 = {
"spark.kubernetes.authenticate.driver.serviceAccountName":
"service_account_name",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.access.key": "s3_access_key",
"spark.hadoop.fs.s3a.secret.key": "secret_key",
}


To implement these configurations and enable Spark applications to interact
with GCS and S3, I guess we can approach it this way

1) Spark Repository Integration: These configurations need to be added to
the Spark repository as part of the supported configuration options for k8s
deployments.

2) Configuration Settings: Users need to specify these configurations when
submitting Spark applications to a Kubernetes cluster. They can include
these configurations in the Spark application code or pass them as
command-line arguments or environment variables during application
submission.

HTH

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 7 Apr 2024 at 13:31, Vakaris Baškirov 
wrote:

> There is an IBM shuffle service plugin that supports S3
> https://github.com/IBM/spark-s3-shuffle
>
> Though I would think a feature like this could be a part of the main Spark
> repo. Trino already has out-of-box support for s3 exchange (shuffle) and
> it's very useful.
>
> Vakaris
>
> On Sun, Apr 7, 2024 at 12:27 PM Mich Talebzadeh 
> wrote:
>
>>
>> Thanks for your suggestion that I take it as a workaround. Whilst this
>> workaround can potentially address storage allocation issues, I was more
>> interested in exploring solutions that offer a more seamless integration
>> with large distributed file systems like HDFS, GCS, or S3. This would
>> ensure better performance and scalability for handling larger datasets
>> efficiently.
>>
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Sat, 6 Apr 2024 at 21:28, Bjørn Jørgensen 
>> wrote:
>>
>>> You can make a PVC on K8S call it 300GB
>>>
>>> make a folder in yours dockerfile
>>> WORKDIR /opt/spark/work-dir
>>> RUN chmod g+w /opt/spark/work-dir
>>>
>>> start spark with adding this
>>>
>>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName",
>>> "300gb") \
>>>
>>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path",
>>> "/opt/spark/work-dir") \
>>>
>>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly",
>>> "False") \
>>>
>>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName",
>>> "300gb") \
>>>
>>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.path",
>>> "/opt/spark/work-dir") \
>>>
>>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.readOnly",
>>> "False") \
>>>   .config("spark.local.dir", "/opt/spark/work-dir")
>>>
>>>
>>>
>>>
>>> lør. 6. apr. 2024 kl. 15:45 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
 I have seen some older references for shuffle service for k8s,
 although it is not clear they are talking about a generic shuffle
 service for k8s.

 Anyhow with the advent of genai and the need to allow for a larger
 volume of data, I was wondering if there has been any more work on
 this