Example UDAF fails with "not serializable" exception

2024-04-06 Thread Owen Bell
https://spark.apache.org/docs/3.3.2/sql-ref-functions-udf-aggregate.html

I'm trying to run this example on Databricks, and it fails with the stacktrace 
below.  It's literally a copy-paste from the example, what am I missing?



Job aborted due to stage failure: Task not serializable: 
java.io.NotSerializableException: org.apache.spark.sql.TypedColumn 
Serialization stack: - object not serializable (class: 
org.apache.spark.sql.TypedColumn, value: 
myaverage(knownnotnull(assertnotnull(input[0, 
$line867a2458f6df413bb3920f7fc2dbb8c381.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Average,
 true])).sum AS sum, knownnotnull(assertnotnull(input[0, 
$line867a2458f6df413bb3920f7fc2dbb8c381.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Average,
 true])).count AS count, newInstance(class 
$line867a2458f6df413bb3920f7fc2dbb8c381.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Average),
 boundreference()) AS average_salary)


Re: External Spark shuffle service for k8s

2024-04-06 Thread Mich Talebzadeh
Thanks for your suggestion that I take it as a workaround. Whilst this
workaround can potentially address storage allocation issues, I was more
interested in exploring solutions that offer a more seamless integration
with large distributed file systems like HDFS, GCS, or S3. This would
ensure better performance and scalability for handling larger datasets
efficiently.


Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sat, 6 Apr 2024 at 21:28, Bjørn Jørgensen 
wrote:

> You can make a PVC on K8S call it 300GB
>
> make a folder in yours dockerfile
> WORKDIR /opt/spark/work-dir
> RUN chmod g+w /opt/spark/work-dir
>
> start spark with adding this
>
> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName",
> "300gb") \
>
> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path",
> "/opt/spark/work-dir") \
>
> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly",
> "False") \
>
> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName",
> "300gb") \
>
> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.path",
> "/opt/spark/work-dir") \
>
> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.readOnly",
> "False") \
>   .config("spark.local.dir", "/opt/spark/work-dir")
>
>
>
>
> lør. 6. apr. 2024 kl. 15:45 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> I have seen some older references for shuffle service for k8s,
>> although it is not clear they are talking about a generic shuffle
>> service for k8s.
>>
>> Anyhow with the advent of genai and the need to allow for a larger
>> volume of data, I was wondering if there has been any more work on
>> this matter. Specifically larger and scalable file systems like HDFS,
>> GCS , S3 etc, offer significantly larger storage capacity than local
>> disks on individual worker nodes in a k8s cluster, thus allowing
>> handling much larger datasets more efficiently. Also the degree of
>> parallelism and fault tolerance  with these files systems come into
>> it. I will be interested in hearing more about any progress on this.
>>
>> Thanks
>> .
>>
>> Mich Talebzadeh,
>>
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> Disclaimer: The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner Von Braun)".
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: External Spark shuffle service for k8s

2024-04-06 Thread Bjørn Jørgensen
You can make a PVC on K8S call it 300GB

make a folder in yours dockerfile
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir

start spark with adding this

.config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName",
"300gb") \

.config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path",
"/opt/spark/work-dir") \

.config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly",
"False") \

.config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName",
"300gb") \

.config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.path",
"/opt/spark/work-dir") \

.config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.readOnly",
"False") \
  .config("spark.local.dir", "/opt/spark/work-dir")




lør. 6. apr. 2024 kl. 15:45 skrev Mich Talebzadeh :

> I have seen some older references for shuffle service for k8s,
> although it is not clear they are talking about a generic shuffle
> service for k8s.
>
> Anyhow with the advent of genai and the need to allow for a larger
> volume of data, I was wondering if there has been any more work on
> this matter. Specifically larger and scalable file systems like HDFS,
> GCS , S3 etc, offer significantly larger storage capacity than local
> disks on individual worker nodes in a k8s cluster, thus allowing
> handling much larger datasets more efficiently. Also the degree of
> parallelism and fault tolerance  with these files systems come into
> it. I will be interested in hearing more about any progress on this.
>
> Thanks
> .
>
> Mich Talebzadeh,
>
> Technologist | Solutions Architect | Data Engineer  | Generative AI
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-06 Thread 刘唯
This indeed looks like a bug. I will take some time to look into it.

Mich Talebzadeh  于2024年4月3日周三 01:55写道:

>
> hm. you are getting below
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
>
> The problem seems to be that you are using the append output mode when
> writing the streaming query results to Kafka. This mode is designed for
> scenarios where you want to append new data to an existing dataset at the
> sink (in this case, the "sink" topic in Kafka). However, your query
> involves a streaming aggregation: group by provinceId, window('createTime',
> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
> requires a watermark to ensure exactly-once processing when using
> aggregations with append mode. Your code already defines a watermark on the
> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
> "10 seconds")). However, the error message indicates it is missing on the
> start column. Try adding watermark to "start" Column: Modify your code as
> below  to include a watermark on the "start" column generated by the
> window function:
>
> from pyspark.sql.functions import col, from_json, explode, window, sum,
> watermark
>
> streaming_df = session.readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "payment_msg") \
>   .option("startingOffsets", "earliest") \
>   .load() \
>   .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value")) \
>   .select("parsed_value.*") \
>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
> createTime
>
> *# Modified section with watermark on 'start' column*
> streaming_df = streaming_df.groupBy(
>   col("provinceId"),
>   window(col("createTime"), "1 hour", "30 minutes")
> ).agg(
>   sum(col("payAmount")).alias("totalPayAmount")
> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
> window-generated 'start'
>
> # Rest of the code remains the same
> streaming_df.createOrReplaceTempView("streaming_df")
>
> spark.sql("""
> SELECT
>   window.start, window.end, provinceId, totalPayAmount
> FROM streaming_df
> ORDER BY window.start
> """) \
> .writeStream \
> .format("kafka") \
> .option("checkpointLocation", "checkpoint") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("topic", "sink") \
> .start()
>
> Try and see how it goes
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Solutions Architect | Data Engineer  | Generative AI
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 2 Apr 2024 at 22:43, Chloe He 
> wrote:
>
>> Hi Mich,
>>
>> Thank you so much for your response. I really appreciate your help!
>>
>> You mentioned "defining the watermark using the withWatermark function on
>> the streaming_df before creating the temporary view” - I believe this is
>> what I’m doing and it’s not working for me. Here is the exact code snippet
>> that I’m running:
>>
>> ```
>> >>> streaming_df = session.readStream\
>> .format("kafka")\
>> .option("kafka.bootstrap.servers", "localhost:9092")\
>> .option("subscribe", "payment_msg")\
>> .option("startingOffsets","earliest")\
>> .load()\
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))\
>> .select("parsed_value.*")\
>> .withWatermark("createTime", "10 seconds")
>>
>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>
>> >>> spark.sql("""
>> SELECT
>> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
>> FROM streaming_df
>> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>> ORDER BY window.start
>> """)\
>>   .withWatermark("start", "10 seconds")\
>>   .writeStream\
>>   .format("kafka") \
>>   .option("checkpointLocation", "checkpoint") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("topic", "sink") \
>>   .start()
>>
>> AnalysisException: Append output mode not 

Re: [External] Re: Issue of spark with antlr version

2024-04-06 Thread Bjørn Jørgensen
[[VOTE] Release Plan for Apache Spark 4.0.0 (June 2024)](
https://lists.apache.org/thread/r0zn6rd8y25yn2dg59ktw3ttrwxzqrfb)

Apache Spark 4.0.0 Release Plan
===

1. After creating `branch-3.5`, set "4.0.0-SNAPSHOT" in master branch.

2. Creating `branch-4.0` on April 1st, 2024.

3. Apache Spark 4.0.0 RC1 on May 1st, 2024.

4. Apache Spark 4.0.0 Release in June, 2024.

tir. 2. apr. 2024 kl. 12:06 skrev Chawla, Parul :

> ++ Ashima
>
> --
> *From:* Chawla, Parul 
> *Sent:* Tuesday, April 2, 2024 9:56 AM
> *To:* Bjørn Jørgensen ; user@spark.apache.org <
> user@spark.apache.org>
> *Cc:* Sahni, Ashima ;
> user@spark.apache.org ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Re: [External] Re: Issue of spark with antlr version
>
> Hi Team,
> Any update on below query :when spark 4.x will be released to maven as on
> maven site i could see spark core 3.5.1 .
>
> Regards,
> Parul
>
> --
> *From:* Chawla, Parul 
> *Sent:* Monday, April 1, 2024 4:20 PM
> *To:* Bjørn Jørgensen 
> *Cc:* Sahni, Ashima ;
> user@spark.apache.org ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* Re: [External] Re: Issue of spark with antlr version
>
> Hi Team,
>
> Can you let us know the when   this spark 4.x will be released to maven.
>
> regards,
> Parul
>
> Get Outlook for iOS 
> --
> *From:* Bjørn Jørgensen 
> *Sent:* Wednesday, February 28, 2024 5:06:54 PM
> *To:* Chawla, Parul 
> *Cc:* Sahni, Ashima ;
> user@spark.apache.org ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* Re: [External] Re: Issue of spark with antlr version
>
> [image: image.png]
>
> ons. 28. feb. 2024 kl. 11:28 skrev Chawla, Parul <
> parul.cha...@accenture.com>:
>
>
> Hi ,
> Can we get spark version on whuich this is resolved.
> --
> *From:* Bjørn Jørgensen 
> *Sent:* Tuesday, February 27, 2024 7:05:36 PM
> *To:* Sahni, Ashima 
> *Cc:* Chawla, Parul ; user@spark.apache.org <
> user@spark.apache.org>; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* [External] Re: Issue of spark with antlr version
>
> *CAUTION:* External email. Be cautious with links and attachments.
> [SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1
> 
>
>
> tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
> :
>
> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>
>
>
> --
> 

Unsubscribe

2024-04-06 Thread rau-jannik
Unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



External Spark shuffle service for k8s

2024-04-06 Thread Mich Talebzadeh
I have seen some older references for shuffle service for k8s,
although it is not clear they are talking about a generic shuffle
service for k8s.

Anyhow with the advent of genai and the need to allow for a larger
volume of data, I was wondering if there has been any more work on
this matter. Specifically larger and scalable file systems like HDFS,
GCS , S3 etc, offer significantly larger storage capacity than local
disks on individual worker nodes in a k8s cluster, thus allowing
handling much larger datasets more efficiently. Also the degree of
parallelism and fault tolerance  with these files systems come into
it. I will be interested in hearing more about any progress on this.

Thanks
.

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org