Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-02-29 Thread Prem Sahoo
Congratulations Sent from my iPhoneOn Feb 29, 2024, at 4:54 PM, Xinrong Meng  wrote:Congratulations!Thanks,XinrongOn Thu, Feb 29, 2024 at 11:16 AM Dongjoon Hyun  wrote:Congratulations!Bests,Dongjoon.On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:Congratulations!At 2024-02-28 17:43:25, "Jungtaek Lim"  wrote:Hi everyone,We are happy to announce the availability of Spark 3.5.1!Spark 3.5.1 is a maintenance release containing stability fixes. Thisrelease is based on the branch-3.5 maintenance branch of Spark. We stronglyrecommend all 3.5 users to upgrade to this stable release.To download Spark 3.5.1, head over to the download page:https://spark.apache.org/downloads.htmlTo view the release notes:https://spark.apache.org/releases/spark-release-3-5-1.htmlWe would like to acknowledge all community members for contributing to thisrelease. This release would not have been possible without you.Jungtaek Limps. Yikun is helping us through releasing the official docker image for Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally available.




Re: pyspark dataframe join with two different data type

2024-02-29 Thread Mich Talebzadeh
This is what you want, how to join two DFs with a string column in one and
an array of strings in the other, keeping only rows where the string is
present in the array.

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName("joins").getOrCreate()

data1 = [Row(combined_id=[1, 2, 3])  # this one has a column combined_id as
an array of integers
data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
single integers

df1 = spark.createDataFrame(data1)
df2 = spark.createDataFrame(data2)

df1.printSchema()
df2.printSchema()

# Perform the join with array_contains. It takes two arguments: an array
and a value. It returns True if the value exists as an element within the
array, otherwise False.
joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))

# Show the result
joined_df.show()

root
 |-- combined_id: array (nullable = true)
 ||-- element: long (containsNull = true)

root
 |-- mr_id: long (nullable = true)

+---+-+
|combined_id|mr_id|
+---+-+
|  [1, 2, 3]|2|
|  [4, 5, 6]|5|
+---+-+

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 29 Feb 2024 at 20:50, Karthick Nk  wrote:

> Hi All,
>
> I have two dataframe with below structure, i have to join these two
> dataframe - the scenario is one column is string in one dataframe and in
> other df join column is array of string, so we have to inner join two df
> and get the data if string value is present in any of the array of string
> value in another dataframe,
>
>
> df1 = spark.sql("""
> SELECT
> mr.id as mr_id,
> pv.id as pv_id,
> array(mr.id, pv.id) as combined_id
> FROM
> table1 mr
> INNER JOIN table2 pv ON pv.id = Mr.recordid
>where
> pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
> or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
> """)
>
> # df1.display()
>
> # Your second query
> df2 = spark.sql("""
> SELECT
> id
> FROM
> table2
> WHERE
> id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>
> """)
>
>
>
> Result data:
> 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is
> common between string and array of string value.
>
> Can you share the sample snippet, how we can do the join for this two
> different datatype in the dataframe.
>
> if any clarification needed, pls feel free to ask.
>
> Thanks
>
>


pyspark dataframe join with two different data type

2024-02-29 Thread Karthick Nk
Hi All,

I have two dataframe with below structure, i have to join these two
dataframe - the scenario is one column is string in one dataframe and in
other df join column is array of string, so we have to inner join two df
and get the data if string value is present in any of the array of string
value in another dataframe,


df1 = spark.sql("""
SELECT
mr.id as mr_id,
pv.id as pv_id,
array(mr.id, pv.id) as combined_id
FROM
table1 mr
INNER JOIN table2 pv ON pv.id = Mr.recordid
   where
pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
""")

# df1.display()

# Your second query
df2 = spark.sql("""
SELECT
id
FROM
table2
WHERE
id = '35122806-4cd2-4916-a149-24ea55c2dc36'

""")



Result data:
35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is
common between string and array of string value.

Can you share the sample snippet, how we can do the join for this two
different datatype in the dataframe.

if any clarification needed, pls feel free to ask.

Thanks


Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-02-29 Thread Xinrong Meng
Congratulations!

Thanks,
Xinrong

On Thu, Feb 29, 2024 at 11:16 AM Dongjoon Hyun 
wrote:

> Congratulations!
>
> Bests,
> Dongjoon.
>
> On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:
>
>> Congratulations!
>>
>>
>>
>> At 2024-02-28 17:43:25, "Jungtaek Lim" 
>> wrote:
>>
>> Hi everyone,
>>
>> We are happy to announce the availability of Spark 3.5.1!
>>
>> Spark 3.5.1 is a maintenance release containing stability fixes. This
>> release is based on the branch-3.5 maintenance branch of Spark. We
>> strongly
>> recommend all 3.5 users to upgrade to this stable release.
>>
>> To download Spark 3.5.1, head over to the download page:
>> https://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-5-1.html
>>
>> We would like to acknowledge all community members for contributing to
>> this
>> release. This release would not have been possible without you.
>>
>> Jungtaek Lim
>>
>> ps. Yikun is helping us through releasing the official docker image for
>> Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally available.
>>
>>


Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-02-28 Thread Dongjoon Hyun
Congratulations!

Bests,
Dongjoon.

On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:

> Congratulations!
>
>
>
> At 2024-02-28 17:43:25, "Jungtaek Lim" 
> wrote:
>
> Hi everyone,
>
> We are happy to announce the availability of Spark 3.5.1!
>
> Spark 3.5.1 is a maintenance release containing stability fixes. This
> release is based on the branch-3.5 maintenance branch of Spark. We strongly
> recommend all 3.5 users to upgrade to this stable release.
>
> To download Spark 3.5.1, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-5-1.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
> Jungtaek Lim
>
> ps. Yikun is helping us through releasing the official docker image for
> Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally available.
>
>


Re: [External] Re: Issue of spark with antlr version

2024-02-28 Thread Chawla, Parul

Hi ,
Can we get spark version on whuich this is resolved.

From: Bjørn Jørgensen 
Sent: Tuesday, February 27, 2024 7:05:36 PM
To: Sahni, Ashima 
Cc: Chawla, Parul ; user@spark.apache.org 
; Misra Parashar, Jyoti 
; Mekala, Rajesh ; 
Grandhi, Venkatesh ; George, Rejish 
; Tayal, Aayushi 
Subject: [External] Re: Issue of spark with antlr version

CAUTION: External email. Be cautious with links and attachments.

[SPARK-44366][BUILD] Upgrade antlr4 to 
4.13.1


tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima 
:

Hi Team,



Can you please let us know the update on below.



Thanks,

Ashima



From: Chawla, Parul 
mailto:parul.cha...@accenture.com>>
Sent: Sunday, February 25, 2024 11:57 PM
To: user@spark.apache.org
Cc: Sahni, Ashima 
mailto:ashima.sa...@accenture.com>>; Misra 
Parashar, Jyoti 
mailto:jyoti.misra.paras...@accenture.com>>
Subject: Issue of spark with antlr version



Hi Spark Team,





Our application is currently using spring framrwork 5.3.31 .To upgrade it to 
6.x , as per application dependency we must upgrade Spark and Hibernate jars as 
well .

With Hibernate compatible upgrade, the dependent Antlr4 jar version has been 
upgraded to 4.10.1 but there’s no Spark version available with the upgraded 
Antlr4 jar.

Can u please update when we can have updated version with upgraded antl4 
version..





Regards,

Parul



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security, AI-powered support capabilities, and assessment of 
internal compliance with Accenture policy. Your privacy is important to us. 
Accenture uses your personal data only in compliance with data protection laws. 
For further information on how Accenture processes your personal data, please 
see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [External] Re: Issue of spark with antlr version

2024-02-28 Thread Bjørn Jørgensen
[image: image.png]

ons. 28. feb. 2024 kl. 11:28 skrev Chawla, Parul :

>
> Hi ,
> Can we get spark version on whuich this is resolved.
> --
> *From:* Bjørn Jørgensen 
> *Sent:* Tuesday, February 27, 2024 7:05:36 PM
> *To:* Sahni, Ashima 
> *Cc:* Chawla, Parul ; user@spark.apache.org <
> user@spark.apache.org>; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* [External] Re: Issue of spark with antlr version
>
> *CAUTION:* External email. Be cautious with links and attachments.
> [SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1
> 
>
>
> tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
> :
>
> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re:[ANNOUNCE] Apache Spark 3.5.1 released

2024-02-28 Thread beliefer
Congratulations!







At 2024-02-28 17:43:25, "Jungtaek Lim"  wrote:

Hi everyone,


We are happy to announce the availability of Spark 3.5.1!

Spark 3.5.1 is a maintenance release containing stability fixes. This
release is based on the branch-3.5 maintenance branch of Spark. We strongly
recommend all 3.5 users to upgrade to this stable release.

To download Spark 3.5.1, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-5-1.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Jungtaek Lim



ps. Yikun is helping us through releasing the official docker image for Spark 
3.5.1 (Thanks Yikun!) It may take some time to be generally available.



[ANNOUNCE] Apache Spark 3.5.1 released

2024-02-28 Thread Jungtaek Lim
Hi everyone,

We are happy to announce the availability of Spark 3.5.1!

Spark 3.5.1 is a maintenance release containing stability fixes. This
release is based on the branch-3.5 maintenance branch of Spark. We strongly
recommend all 3.5 users to upgrade to this stable release.

To download Spark 3.5.1, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-5-1.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Jungtaek Lim

ps. Yikun is helping us through releasing the official docker image for
Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally available.


Re: [Spark Core] Potential bug in JavaRDD#countByValue

2024-02-27 Thread Mich Talebzadeh
Hi,

Quick observations from what you have provided

- The observed discrepancy between rdd.count() and
rdd.map(Item::getType).countByValue()in distributed mode suggests a
potential aggregation issue with countByValue(). The correct results in
local mode give credence to this theory.
- Workarounds using mapToPair() and reduceByKey() produce identical
results, indicating a broader pattern rather than method specific behaviour.
- Dataset.groupBy().count()yields accurate results, but this method incurs
overhead for RDD-to-Dataset conversion.

Your expected total count  of 75187 is around  7 times larger than the
observed count of 10519, mapping to the number of your executors 7. This
suggests potentially incorrect aggregation or partial aggregation across
executors.

Now before raising red flag, these could be the culprit

- Data Skew, uneven distribution of data across executors could cause
partial aggregation if a single executor processes most items of a
particular type.
- Partial Aggregations, Spark might be combining partial counts from
executors incorrectly, leading to inaccuracies.
- Finally a bug in 3.5 is possible.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 27 Feb 2024 at 19:02, Stuart Fehr  wrote:

> Hello, I recently encountered a bug with the results from
> JavaRDD#countByValue that does not reproduce when running locally. For
> background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.
>
> The code in question is something like this:
>
> JavaRDD rdd = // ...
>> rdd.count();  // 75187
>
>
>
> // Get the count broken down by type
>> rdd.map(Item::getType).countByValue();
>
>
> Which gives these results from the resulting Map:
>
> TypeA: 556
> TypeB: 9168
> TypeC: 590
> TypeD: 205
> (total: 10519)
>
> These values are incorrect, since every item has a type defined, so the
> total of all the types should be 75187. When I inspected this stage in the
> Spark UI, I found that it was using 7 executors. Since the value here is
> about 1/7th of the actual expected value, I suspect that there is some
> issue with the way that the executors report their results back to the
> driver. These results for the same code are correct when I run the job in
> local mode ("local[4]"), so it may also have something to do with how data
> is shared across processes.
>
> For workarounds, I have also tried:
>
> rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
>> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
>> 1L)).reduceByKey(Long::sum).collectAsMap();
>
>
> These yielded the same (incorrect) result.
>
> I did find that using Dataset.groupBy().count() did yield the correct
> results:
>
> TypeA: 3996
> TypeB: 65490
> TypeC: 4224
> TypeD: 1477
>
> So, I have an immediate workaround, but it is somewhat awkward since I
> have to create a Dataframe from a JavaRDD each time.
>
> Am I doing something wrong? Do these methods not work the way that I
> expected them to from reading the documentation? Is this a legitimate bug?
>
> I would be happy to provide more details if that would help in debugging
> this scenario.
>
> Thank you for your time,
> ~Stuart Fehr
>


[Spark Core] Potential bug in JavaRDD#countByValue

2024-02-27 Thread Stuart Fehr
Hello, I recently encountered a bug with the results from
JavaRDD#countByValue that does not reproduce when running locally. For
background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.

The code in question is something like this:

JavaRDD rdd = // ...
> rdd.count();  // 75187



// Get the count broken down by type
> rdd.map(Item::getType).countByValue();


Which gives these results from the resulting Map:

TypeA: 556
TypeB: 9168
TypeC: 590
TypeD: 205
(total: 10519)

These values are incorrect, since every item has a type defined, so the
total of all the types should be 75187. When I inspected this stage in the
Spark UI, I found that it was using 7 executors. Since the value here is
about 1/7th of the actual expected value, I suspect that there is some
issue with the way that the executors report their results back to the
driver. These results for the same code are correct when I run the job in
local mode ("local[4]"), so it may also have something to do with how data
is shared across processes.

For workarounds, I have also tried:

rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
> 1L)).reduceByKey(Long::sum).collectAsMap();


These yielded the same (incorrect) result.

I did find that using Dataset.groupBy().count() did yield the correct
results:

TypeA: 3996
TypeB: 65490
TypeC: 4224
TypeD: 1477

So, I have an immediate workaround, but it is somewhat awkward since I have
to create a Dataframe from a JavaRDD each time.

Am I doing something wrong? Do these methods not work the way that I
expected them to from reading the documentation? Is this a legitimate bug?

I would be happy to provide more details if that would help in debugging
this scenario.

Thank you for your time,
~Stuart Fehr


Re: Issue of spark with antlr version

2024-02-27 Thread Bjørn Jørgensen
[SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1



tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
:

> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Issue of spark with antlr version

2024-02-27 Thread Mich Talebzadeh
Hi,

You have provided little information about where Spark fits in here. So I
am guessing :)

Data Source (JSON, XML, log file, etc.) -->
Preprocessing (Spark jobs for filtering, cleaning, etc.)? -->
Antlr Parser (Generated tool) -->
Extracted Data (Mapped to model) -->
Spring Data Model (Java objects) -->
Spring Application Logic (Controllers, Services, Repositories)

etc. Is this a good guess?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 27 Feb 2024 at 12:25, Sahni, Ashima
 wrote:

> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>


RE: Issue of spark with antlr version

2024-02-27 Thread Sahni, Ashima
Hi Team,

Can you please let us know the update on below.

Thanks,
Ashima

From: Chawla, Parul 
Sent: Sunday, February 25, 2024 11:57 PM
To: user@spark.apache.org
Cc: Sahni, Ashima ; Misra Parashar, Jyoti 

Subject: Issue of spark with antlr version

Hi Spark Team,


Our application is currently using spring framrwork 5.3.31 .To upgrade it to 
6.x , as per application dependency we must upgrade Spark and Hibernate jars as 
well .
With Hibernate compatible upgrade, the dependent Antlr4 jar version has been 
upgraded to 4.10.1 but there's no Spark version available with the upgraded 
Antlr4 jar.
Can u please update when we can have updated version with upgraded antl4 
version..


Regards,
Parul



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security, AI-powered support capabilities, and assessment of 
internal compliance with Accenture policy. Your privacy is important to us. 
Accenture uses your personal data only in compliance with data protection laws. 
For further information on how Accenture processes your personal data, please 
see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


Unsubscribe

2024-02-27 Thread benson fang
Unsubscribe

Regards


Re: Bugs with joins and SQL in Structured Streaming

2024-02-27 Thread Andrzej Zera
Hi,

Yes, I tested all of them on spark 3.5.

Regards,
Andrzej


pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
napisał(a):

> Hi,
>
> These are all on spark 3.5, correct?
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:
>
>> Hey all,
>>
>> I've been using Structured Streaming in production for almost a year
>> already and I want to share the bugs I found in this time. I created a test
>> for each of the issues and put them all here:
>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>
>> I split the issues into three groups: outer joins on event time, interval
>> joins and Spark SQL.
>>
>> Issues related to outer joins:
>>
>>- When joining three or more input streams on event time, if two or
>>more streams don't contain an event for a join key (which is event time),
>>no row will be output even if other streams contain an event for this join
>>key. Tests that check for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>and
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>- When joining aggregated stream with raw events with a stream with
>>already aggregated events (aggregation made outside of Spark), then no row
>>will be output if that second stream don't contain a corresponding event.
>>Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>- When joining two aggregated streams (aggregated in Spark), no
>>result is produced. Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>I've already reported this one here:
>>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>>handled yet.
>>
>> Issues related to interval joins:
>>
>>- When joining three streams (A, B, C) using interval join on event
>>time, in the way that B.eventTime is conditioned on A.eventTime and
>>C.eventTime is also conditioned on A.eventTime, and then doing window
>>aggregation based on A's event time, the result is output only after
>>watermark crosses the window end + interval(A, B) + interval (A, C).
>>However, I'd expect results to be output faster, i.e. when the watermark
>>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
>>that event B can happen 3 minutes after event A and event C can happen 5
>>minutes after A, there is no point to suspend reporting output for 8
>>minutes (3+5) after the end of the window if we know that no more event 
>> can
>>be matched after 5 min from the window end (assuming window end is based 
>> on
>>A's event time). Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>
>> SQL issues:
>>
>>- WITH clause (in contrast to subquery) seems to create a static
>>DataFrame that can't be used in streaming joins. Test that checks for 
>> this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>- Two subqueries, each aggregating data using window() functio,
>>breaks the output schema. Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>
>> I'm a beginner with Scala (I'm using Structured Streaming with PySpark)
>> so won't be able to provide fixes. But I hope the test cases I provided can
>> be of some help.
>>
>> Regards,
>> Andrzej
>>
>


Re: Bugs with joins and SQL in Structured Streaming

2024-02-26 Thread Mich Talebzadeh
Hi,

These are all on spark 3.5, correct?

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:

> Hey all,
>
> I've been using Structured Streaming in production for almost a year
> already and I want to share the bugs I found in this time. I created a test
> for each of the issues and put them all here:
> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>
> I split the issues into three groups: outer joins on event time, interval
> joins and Spark SQL.
>
> Issues related to outer joins:
>
>- When joining three or more input streams on event time, if two or
>more streams don't contain an event for a join key (which is event time),
>no row will be output even if other streams contain an event for this join
>key. Tests that check for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>and
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>- When joining aggregated stream with raw events with a stream with
>already aggregated events (aggregation made outside of Spark), then no row
>will be output if that second stream don't contain a corresponding event.
>Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>- When joining two aggregated streams (aggregated in Spark), no result
>is produced. Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>I've already reported this one here:
>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>handled yet.
>
> Issues related to interval joins:
>
>- When joining three streams (A, B, C) using interval join on event
>time, in the way that B.eventTime is conditioned on A.eventTime and
>C.eventTime is also conditioned on A.eventTime, and then doing window
>aggregation based on A's event time, the result is output only after
>watermark crosses the window end + interval(A, B) + interval (A, C).
>However, I'd expect results to be output faster, i.e. when the watermark
>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
>that event B can happen 3 minutes after event A and event C can happen 5
>minutes after A, there is no point to suspend reporting output for 8
>minutes (3+5) after the end of the window if we know that no more event can
>be matched after 5 min from the window end (assuming window end is based on
>A's event time). Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>
> SQL issues:
>
>- WITH clause (in contrast to subquery) seems to create a static
>DataFrame that can't be used in streaming joins. Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>- Two subqueries, each aggregating data using window() functio, breaks
>the output schema. Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>
> I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so
> won't be able to provide fixes. But I hope the test cases I provided can be
> of some help.
>
> Regards,
> Andrzej
>


Bugs with joins and SQL in Structured Streaming

2024-02-26 Thread Andrzej Zera
Hey all,

I've been using Structured Streaming in production for almost a year
already and I want to share the bugs I found in this time. I created a test
for each of the issues and put them all here:
https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala

I split the issues into three groups: outer joins on event time, interval
joins and Spark SQL.

Issues related to outer joins:

   - When joining three or more input streams on event time, if two or more
   streams don't contain an event for a join key (which is event time), no row
   will be output even if other streams contain an event for this join key.
   Tests that check for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
   and
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
   - When joining aggregated stream with raw events with a stream with
   already aggregated events (aggregation made outside of Spark), then no row
   will be output if that second stream don't contain a corresponding event.
   Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
   - When joining two aggregated streams (aggregated in Spark), no result
   is produced. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
   I've already reported this one here:
   https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
   handled yet.

Issues related to interval joins:

   - When joining three streams (A, B, C) using interval join on event
   time, in the way that B.eventTime is conditioned on A.eventTime and
   C.eventTime is also conditioned on A.eventTime, and then doing window
   aggregation based on A's event time, the result is output only after
   watermark crosses the window end + interval(A, B) + interval (A, C).
   However, I'd expect results to be output faster, i.e. when the watermark
   crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
   that event B can happen 3 minutes after event A and event C can happen 5
   minutes after A, there is no point to suspend reporting output for 8
   minutes (3+5) after the end of the window if we know that no more event can
   be matched after 5 min from the window end (assuming window end is based on
   A's event time). Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32

SQL issues:

   - WITH clause (in contrast to subquery) seems to create a static
   DataFrame that can't be used in streaming joins. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
   - Two subqueries, each aggregating data using window() functio, breaks
   the output schema. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122

I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so
won't be able to provide fixes. But I hope the test cases I provided can be
of some help.

Regards,
Andrzej


Re: Bintray replacement for spark-packages.org

2024-02-25 Thread Richard Eggert
I've been trying to obtain clarification on the terms of use regarding
repo.spark-packages.org. I emailed feedb...@spark-packages.org two weeks
ago, but have not heard back. Whom should I contact?

On Mon, Apr 26, 2021 at 8:13 AM Bo Zhang  wrote:

> Hi Apache Spark users,
>
> As you might know, Bintray, which is the repository service used for
> spark-packages.org, is in its sunset process. There was a planned
> brown-out on April 12th
>  and there will be
> another one on April 26th
> , and it will no
> longer be available from May 1st.
>
> We have spun up a new repository service at
> https://repos.spark-packages.org and it will be the new home for the
> artifacts on spark-packages.
>
> Given the planned Bintray brown-out, this is a good time for us to test
> the new repository service. To consume artifacts from that, please replace "
> dl.bintray.com/spark-packages/maven" with "repos.spark-packages.org" in
> the Maven pom files or sbt build files in your repositories, e.g.:
> https://github.com/apache/spark/pull/32346
>
> We are still working on the release process to the new repository service,
> and will provide an update here shortly.
>
> If you have any questions for using the new repository service, or any
> general questions for spark-packages, please reach out to
> feedb...@spark-packages.org.
>
> Thanks,
> Bo
>


-- 
Rich


Issue of spark with antlr version

2024-02-25 Thread Chawla, Parul
Hi Spark Team,


Our application is currently using spring framrwork 5.3.31 .To upgrade it to 
6.x , as per application dependency we must upgrade Spark and Hibernate jars as 
well .
With Hibernate compatible upgrade, the dependent Antlr4 jar version has been 
upgraded to 4.10.1 but there's no Spark version available with the upgraded 
Antlr4 jar.
Can u please update when we can have updated version with upgraded antl4 
version..


Regards,
Parul



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security, AI-powered support capabilities, and assessment of 
internal compliance with Accenture policy. Your privacy is important to us. 
Accenture uses your personal data only in compliance with data protection laws. 
For further information on how Accenture processes your personal data, please 
see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


unsubscribe

2024-02-24 Thread Ameet Kini



Re: job uuid not unique

2024-02-24 Thread Xin Zhang
unsubscribe

On Sat, Feb 17, 2024 at 3:04 AM Рамик И  wrote:

>
> Hi
> I'm using Spark Streaming to read from Kafka and write to S3. Sometimes I
> get errors when writing org.apache.hadoop.fs.FileAlreadyExistsException.
>
> Spark version: 3.5.0
> scala version : 2.13.8
> Cluster: k8s
>
> libraryDependencies
> org.apache.hadoop.hadoop-aws3.3.4
> com.amazonaws.aws-java-sdk-s31.12.600
>
>
>
> code:
> df
> .coalesce(1)
> .write
> .option("fs.s3a.committer.require.uuid", "true")
>  .option("fs.s3a.committer.generate.uuid", "true")
> .option("fs.s3a.committer.name", "magic")
> .option("fs.s3a.committer.magic.enabled", "true")
>  .option("orc.compress", "zlib")
>  .mode(SaveMode.Append)
> .orc(path)
>
>
>
> executor 9
>
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_00_13217, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13217/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:25 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13217
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13217: duration
> 0:00.061s
> 24/02/16 13:05:25 ERROR Executor: Exception in task 0.2 in stage 367.1
> (TID 13217)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
> executor 10
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_00_13216, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13216/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:24 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13216
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13216: duration
> 0:00.112s
> 24/02/16 13:05:24 ERROR Executor: Exception in task 0.1 in stage 367.1
> (TID 13216)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
>
> how can I fix it ?
>


-- 
Zhang Xin(张欣)
Email:josseph.zh...@gmail.com


Re: AQE coalesce 60G shuffle data into a single partition

2024-02-24 Thread Enrico Minack

Hi Shay,

maybe this is related to the small number of output rows (1,250) of the
last exchange step that consume those 60GB shuffle data.

Looks like your outer transformation is something like

df.groupBy($"id").agg(collect_list($"prop_name"))

Have you tried adding a repartition as an attempt to convince AQE to
exchange into a specific number of partitions?

df.groupBy($"id").agg(collect_list($"prop_name")).repartition(100, $"id")

Can you provide some Spark code that reproduce the issue with synthetic
data and cleansed Spark logic?

Cheers,
Enrico


Am 22.02.24 um 15:14 schrieb Shay Elbaz:

Dear community,

We have noticed that AQE is coalescing a substantial amount of data
(approximately 60GB) into a single partition during query execution.
This behavior is unexpected given the absence of data skew, broadcast,
and the significant size of the shuffle operation.

*Environment Details:*

 *
Apache Spark Version: 3.1.3
 *
Platform: Dataproc 2.0
 *
Executors Configuration: 90GB memory, 15 cores

*Configuration Parameters:* We have examined the relevant
configuration parameters, and tried many different variations, but the
behavior persists. For example:
spark.sql.adaptive.advisoryPartitionSizeInBytes=104857600 //100MB
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000
spark.sql.adaptive.coalescePartitions.minPartitionNum=500
spark.sql.optimizer.dynamicPartitionPruning.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1 // off

*The full plan and diagram from the SQL tab are shown below*

Please advice:

 1. Are there additional configuration parameters or best practices we
should be aware of in such scenarios?
2.
Are there known related issues in 3.1.3? (didn't find any on Jira)


Thanks in advance,
Shay


...


Re: [Beginner Debug]: Executor OutOfMemoryError

2024-02-23 Thread Mich Talebzadeh
Seems like you are having memory issues. Examine your settings.

   1.  It appears that  your driver memory setting is too high. It should
   be a fraction of total memy provided by YARN
   2. Use the Spark UI to monitor the job's memory consumption. Check the
   Storage tab to see how memory is being utilized across caches, data, and
   shuffle.
   3. Check the Executors tab to identify tasks or executors that are
   experiencing memory issues. Look for tasks with high input sizes or shuffle
   spills.
   4. In YARN mode, consider setting spark.executor.memoryOverhead property
   to handle executor overhead. This is important for tasks that require
   additional memory beyond the executor memory setting. Example
   5. --conf spark.executor.memoryOverhead=1000

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Fri, 23 Feb 2024 at 02:42, Shawn Ligocki  wrote:

> Hi I'm new to Spark and I'm running into a lot of OOM issues while trying
> to scale up my first Spark application. I am running into these issues with
> only 1% of the final expected data size. Can anyone help me understand how
> to properly configure Spark to use limited memory or how to debug which
> part of my application is causing so much memory trouble?
>
> My logs end up with tons of messages like:
>
> 24/02/22 10:51:01 WARN TaskMemoryManager: Failed to allocate a page
>> (134217728 bytes), try again.
>> 24/02/22 10:51:01 WARN RowBasedKeyValueBatch: Calling spill() on
>> RowBasedKeyValueBatch. Will not spill but return 0.
>> 24/02/22 10:52:28 WARN Executor: Issue communicating with driver in
>> heartbeater
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [1
>> milliseconds]. This timeout is controlled by
>> spark.executor.heartbeatInterval
>> ...
>> 24/02/22 10:58:17 WARN NettyRpcEnv: Ignored message:
>> HeartbeatResponse(false)
>> 24/02/22 10:58:17 WARN HeartbeatReceiver: Removing executor driver with
>> no recent heartbeats: 207889 ms exceeds timeout 12 ms
>> 24/02/22 10:58:17 ERROR Executor: Exception in task 175.0 in stage 2.0
>> (TID 676)
>> java.lang.OutOfMemoryError: Java heap space
>> ...
>
>
> Background: The goal of this application is to load a large number of
> parquet files, group by a couple fields and compute some summarization
> metrics for each group and write the result out. In Python basically:
>
> from pyspark.sql import SparkSession
>> import pyspark.sql.functions as func
>
>
>> spark = SparkSession.builder.getOrCreate()
>> df = spark.read.parquet(*pred_paths)
>> df = df.groupBy("point_id", "species_code").agg(
>>   func.count("pred_occ").alias("ensemble_support"))
>> df.write.parquet(output_path)
>
>
> And I am launching it with:
>
> spark-submit \
>>   --name ensemble \
>>   --driver-memory 64g --executor-memory 64g \
>>   stem/ensemble_spark.py
>
>
> I noticed that increasing --driver-memory and --executor-memory did help
> me scale up somewhat, but I cannot increase those forever.
>
> Some details:
>
>- All my tests are currently on a single cluster node (with 128GB RAM
>& 64 CPU cores) or locally on my laptop (32GB RAM & 12 CPU cores).
>Eventually, I expect to run this in parallel on the cluster.
>- This is running on Spark 3.0.1 (in the cluster), I'm seeing the same
>issues with 3.5 on my laptop.
>- The input data is tons of parquet files stored on NFS. For the final
>application it will be about 50k parquet files ranging in size up to 15GB
>each. Total size of 100TB, 4 trillion rows, 5 columns. I am currently
>testing with ~1% this size: 500 files, 1TB total, 40B rows total.
>- There should only be a max of 100 rows per group. So I expect an
>output size somewhere in the range 1-5TB, 40-200B rows. For the test: 50GB,
>2B rows. These output files are also written to NFS.
>- The rows for the same groups are not near each other. Ex: no single
>parquet file will have any two rows for the same group.
>
> Here are some questions I have:
>
>1. Does Spark know how much memory is available? Do I need to tell it
>somehow? Is there other configuration that I should set up for a run like
>this? I know that 1TB input data is too much to fit in memory, but I
>assumed that Spark would work on it in small enough batches to fit. Do I
>need to configure those batches somehow?
>2. How can I debug what is causing it to OOM?
>3. Does this have something to do with the fact that I'm loading the
>  

[Beginner Debug]: Executor OutOfMemoryError

2024-02-22 Thread Shawn Ligocki
Hi I'm new to Spark and I'm running into a lot of OOM issues while trying
to scale up my first Spark application. I am running into these issues with
only 1% of the final expected data size. Can anyone help me understand how
to properly configure Spark to use limited memory or how to debug which
part of my application is causing so much memory trouble?

My logs end up with tons of messages like:

24/02/22 10:51:01 WARN TaskMemoryManager: Failed to allocate a page
> (134217728 bytes), try again.
> 24/02/22 10:51:01 WARN RowBasedKeyValueBatch: Calling spill() on
> RowBasedKeyValueBatch. Will not spill but return 0.
> 24/02/22 10:52:28 WARN Executor: Issue communicating with driver in
> heartbeater
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [1
> milliseconds]. This timeout is controlled by
> spark.executor.heartbeatInterval
> ...
> 24/02/22 10:58:17 WARN NettyRpcEnv: Ignored message:
> HeartbeatResponse(false)
> 24/02/22 10:58:17 WARN HeartbeatReceiver: Removing executor driver with no
> recent heartbeats: 207889 ms exceeds timeout 12 ms
> 24/02/22 10:58:17 ERROR Executor: Exception in task 175.0 in stage 2.0
> (TID 676)
> java.lang.OutOfMemoryError: Java heap space
> ...


Background: The goal of this application is to load a large number of
parquet files, group by a couple fields and compute some summarization
metrics for each group and write the result out. In Python basically:

from pyspark.sql import SparkSession
> import pyspark.sql.functions as func


> spark = SparkSession.builder.getOrCreate()
> df = spark.read.parquet(*pred_paths)
> df = df.groupBy("point_id", "species_code").agg(
>   func.count("pred_occ").alias("ensemble_support"))
> df.write.parquet(output_path)


And I am launching it with:

spark-submit \
>   --name ensemble \
>   --driver-memory 64g --executor-memory 64g \
>   stem/ensemble_spark.py


I noticed that increasing --driver-memory and --executor-memory did help me
scale up somewhat, but I cannot increase those forever.

Some details:

   - All my tests are currently on a single cluster node (with 128GB RAM &
   64 CPU cores) or locally on my laptop (32GB RAM & 12 CPU cores).
   Eventually, I expect to run this in parallel on the cluster.
   - This is running on Spark 3.0.1 (in the cluster), I'm seeing the same
   issues with 3.5 on my laptop.
   - The input data is tons of parquet files stored on NFS. For the final
   application it will be about 50k parquet files ranging in size up to 15GB
   each. Total size of 100TB, 4 trillion rows, 5 columns. I am currently
   testing with ~1% this size: 500 files, 1TB total, 40B rows total.
   - There should only be a max of 100 rows per group. So I expect an
   output size somewhere in the range 1-5TB, 40-200B rows. For the test: 50GB,
   2B rows. These output files are also written to NFS.
   - The rows for the same groups are not near each other. Ex: no single
   parquet file will have any two rows for the same group.

Here are some questions I have:

   1. Does Spark know how much memory is available? Do I need to tell it
   somehow? Is there other configuration that I should set up for a run like
   this? I know that 1TB input data is too much to fit in memory, but I
   assumed that Spark would work on it in small enough batches to fit. Do I
   need to configure those batches somehow?
   2. How can I debug what is causing it to OOM?
   3. Does this have something to do with the fact that I'm loading the
   data from Parquet files? Or that I'm loading so many different files? Or
   that I'm loading them from NFS?
   4. Do I need to configure the reduce step (group and aggregation)
   differently because of the type of data I have (large numbers of groups,
   stratified groups)?

Thank you!
-Shawn Ligocki


Re: unsubscribe

2024-02-21 Thread Xin Zhang
unsubscribe

On Tue, Feb 20, 2024 at 9:44 PM kritika jain  wrote:

> Unsubscribe
>
> On Tue, 20 Feb 2024, 3:18 pm Крюков Виталий Семенович,
>  wrote:
>
>>
>> unsubscribe
>>
>>
>>

-- 
Zhang Xin(张欣)
Email:josseph.zh...@gmail.com


Re: Spark 4.0 Query Analyzer Bug Report

2024-02-21 Thread Mich Talebzadeh
Indeed valid points raised including the potential typo in the new spark
version. I suggest, in the meantime, you should look for the so called
alternative debugging methods


   -
   - Simpler  explain(), try basic explain() or explain("extended"). This
   might provide a less detailed, but potentially functional, explanation.
   - Manual Analysis*, *analyze the query structure and logical steps
   yourself
   - Spark UI, review the Spark UI (accessible through your Spark
   application on 4040) for delving into query execution and potential
   bottlenecks.


HTH



Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Wed, 21 Feb 2024 at 08:37, Holden Karau  wrote:

> Do you mean Spark 3.4? 4.0 is very much not released yet.
>
> Also it would help if you could share your query & more of the logs
> leading up to the error.
>
> On Tue, Feb 20, 2024 at 3:07 PM Sharma, Anup 
> wrote:
>
>> Hi Spark team,
>>
>>
>>
>> We ran into a dataframe issue after upgrading from spark 3.1 to 4.
>>
>>
>>
>> query_result.explain(extended=True)\n  File
>> \"…/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py\"
>>
>> raise Py4JJavaError(\npy4j.protocol.Py4JJavaError: An error occurred while 
>> calling z:org.apache.spark.sql.api.python.PythonSQLUtils.explainString.\n: 
>> java.lang.IllegalStateException: You hit a query analyzer bug. Please report 
>> your query to Spark user mailing list.\n\tat 
>> org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:516)\n\tat
>>  
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)\n\tat
>>  scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)\n\tat 
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)\n\tat 
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat 
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)\n\tat
>>  
>> org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)\n\tat
>>  
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)\n\tat
>>  
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)\n\tat
>>  
>> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)\n\tat
>>  scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat 
>> scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat 
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat 
>> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)\n\tat 
>> scala.collect...
>>
>>
>>
>>
>>
>> Could you please let us know if this is already being looked at?
>>
>>
>>
>> Thanks,
>>
>> Anup
>>
>
>
> --
> Cell : 425-233-8271
>


Kafka-based Spark Streaming and Vertex AI for Sentiment Analysis

2024-02-21 Thread Mich Talebzadeh
I am working on a pet project to implement a real-time sentiment analysis
system for analyzing customer reviews. It leverages Kafka for data
ingestion, Spark Structured Streaming (SSS) for real-time processing, and
Vertex AI for sentiment analysis and potential action triggers.

*Features*

   - Real-time processing of customer reviews using SSS.
   - Sentiment analysis using pre-assigned labels or Vertex AI
   models.
   - Integration with Vertex AI for model deployment and prediction serving.
   - Potential actions based on sentiment analysis results
   (e.g., notifications, database updates).


*Tech stack*

   - Kafka: Stream processing platform for data ingestion.
   - SSS for real-time data processing on incoming messages with cleansing
   - Vertex AI: Machine learning platform for model training


I have created sample Json data with relevant attributes for product review as
shown below

{
  "rowkey": "7de43681-0e4a-45cb-ad40-5f14f5678333",
  "product_id": "product-id-1616",
  "timereported": "2024-02-21T08:46:40",
  "description": "Easy to use and setup, perfect for beginners.",
  "price": GBP507,
  "sentiment": negative,
  "product_category": "Electronics",
  "customer_id": "customer4",
  "location": "UK",
  "rating": 6,
  "review_text": "Sleek and modern design, but lacking some features.",
  "user_feedback": "Negative",
  "review_source": "online",
  "sentiment_confidence": 0.33,
  "product_features": "user-friendly",
  "timestamp": "",
  "language": "English"
},

I also attached a high level diagram. There is recently a demand for Gemini
usage. Your views are appreciated.


Thanks

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* I am an architect and not a data scientist. The information
provided is correct to the best of my knowledge but of course cannot be
guaranteed . It is essential to note that, as with any advice, quote "one test
result is worth one-thousand expert opinions (Werner
Von Braun
)".

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

[ANNOUNCE] Apache Kyuubi 1.8.1 is available

2024-02-20 Thread Cheng Pan
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.8.1 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC and
RESTful interfaces for end-users to manipulate large-scale data with
pre-programmed and extensible Spark/Flink/Trino/Hive engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark/Flink/Trino/Hive engines on the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.8.1.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
Cheng Pan, on behalf of Apache Kyuubi community

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 3.3 Query Analyzer Bug Report

2024-02-20 Thread Sharma, Anup
Apologies. Issue is seen after we upgraded from Spark 3.1 to Spark 3.3.  The 
same query runs fine on Spark 3.1.

Omit the Spark version mentioned in email subject earlier.

Anup

Error trace:
query_result.explain(extended=True)\n  File 
\"…/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py\"

raise Py4JJavaError(\npy4j.protocol.Py4JJavaError: An error occurred while 
calling z:org.apache.spark.sql.api.python.PythonSQLUtils.explainString.\n: 
java.lang.IllegalStateException: You hit a query analyzer bug. Please report 
your query to Spark user mailing list.\n\tat 
org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:516)\n\tat
 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)\n\tat
 scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)\n\tat
 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)\n\tat
 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)\n\tat
 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)\n\tat
 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)\n\tat
 scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat 
scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)\n\tat 
scala.collect...


From: "Sharma, Anup" 
Date: Tuesday, February 20, 2024 at 4:58 PM
To: "user@spark.apache.org" 
Cc: "Thinderu, Shalini" 
Subject: Spark 4.0 Query Analyzer Bug Report

Hi Spark team,

We ran into a dataframe issue after upgrading from spark 3.1 to 4.

query_result.explain(extended=True)\n  File 
\"…/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py\"

raise Py4JJavaError(\npy4j.protocol.Py4JJavaError: An error occurred while 
calling z:org.apache.spark.sql.api.python.PythonSQLUtils.explainString.\n: 
java.lang.IllegalStateException: You hit a query analyzer bug. Please report 
your query to Spark user mailing list.\n\tat 
org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:516)\n\tat
 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)\n\tat
 scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)\n\tat
 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)\n\tat
 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)\n\tat
 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)\n\tat
 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)\n\tat
 scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat 
scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)\n\tat 
scala.collect...


Could you please let us know if this is already being looked at?

Thanks,
Anup


Re: Spark 4.0 Query Analyzer Bug Report

2024-02-20 Thread Holden Karau
Do you mean Spark 3.4? 4.0 is very much not released yet.

Also it would help if you could share your query & more of the logs leading
up to the error.

On Tue, Feb 20, 2024 at 3:07 PM Sharma, Anup 
wrote:

> Hi Spark team,
>
>
>
> We ran into a dataframe issue after upgrading from spark 3.1 to 4.
>
>
>
> query_result.explain(extended=True)\n  File
> \"…/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py\"
>
> raise Py4JJavaError(\npy4j.protocol.Py4JJavaError: An error occurred while 
> calling z:org.apache.spark.sql.api.python.PythonSQLUtils.explainString.\n: 
> java.lang.IllegalStateException: You hit a query analyzer bug. Please report 
> your query to Spark user mailing list.\n\tat 
> org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:516)\n\tat
>  
> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)\n\tat
>  scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)\n\tat 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)\n\tat 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)\n\tat
>  
> org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)\n\tat
>  
> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)\n\tat
>  
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)\n\tat
>  
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)\n\tat
>  scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat 
> scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)\n\tat 
> scala.collect...
>
>
>
>
>
> Could you please let us know if this is already being looked at?
>
>
>
> Thanks,
>
> Anup
>


-- 
Cell : 425-233-8271


Spark 4.0 Query Analyzer Bug Report

2024-02-20 Thread Sharma, Anup
Hi Spark team,

We ran into a dataframe issue after upgrading from spark 3.1 to 4.

query_result.explain(extended=True)\n  File 
\"…/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py\"

raise Py4JJavaError(\npy4j.protocol.Py4JJavaError: An error occurred while 
calling z:org.apache.spark.sql.api.python.PythonSQLUtils.explainString.\n: 
java.lang.IllegalStateException: You hit a query analyzer bug. Please report 
your query to Spark user mailing list.\n\tat 
org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:516)\n\tat
 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)\n\tat
 scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)\n\tat 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)\n\tat
 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)\n\tat
 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)\n\tat
 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)\n\tat
 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)\n\tat
 scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat 
scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)\n\tat 
scala.collect...


Could you please let us know if this is already being looked at?

Thanks,
Anup


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-20 Thread Manoj Kumar
Dear @Chao Sun,

I trust you're doing well. Having worked extensively with Spark Nvidia
Rapids, Velox, and Gluten, I'm now contemplating Comet's potential
advantages over Velox in terms of performance and unique features.

While Rapids leverages GPUs effectively, Gazelle's Intel AVX512 intrinsics
which is now EOL. Now, all eyes are on Velox for its universal C++
accelerators(Presto, Spark, PyTorch, XStream (stream processing), F3
(feature engineering), FBETL (data ingestion), XSQL(distributed transaction
processing) , Scribe (message bus infrastructure), Saber (high QPS external
serving), and others...).

In this context, I'm keen to understand Comet's distinctive features and
how its performance compares to Velox. What makes Comet stand out, and how
does its efficiency stack up against Velox across different tasks and
frameworks?

Your insights into Comet's capabilities would be invaluable, it will help
me to evaluate why I should invest my time in this plugin.

Thank you for your time and expertise.

Warm regards,
Manoj Kumar


On Tue, 20 Feb 2024 at 01:51, Mich Talebzadeh 
wrote:

> Ok thanks for your clarifications
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 19 Feb 2024 at 17:24, Chao Sun  wrote:
>
>> Hi Mich,
>>
>> > Also have you got some benchmark results from your tests that you can
>> possibly share?
>>
>> We only have some partial benchmark results internally so far. Once
>> shuffle and better memory management have been introduced, we plan to
>> publish the benchmark results (at least TPC-H) in the repo.
>>
>> > Compared to standard Spark, what kind of performance gains can be
>> expected with Comet?
>>
>> Currently, users could benefit from Comet in a few areas:
>> - Parquet read: a few improvements have been made against reading from S3
>> in particular, so users can expect better scan performance in this scenario
>> - Hash aggregation
>> - Columnar shuffle
>> - Decimals (Java's BigDecimal is pretty slow)
>>
>> > Can one use Comet on k8s in conjunction with something like a Volcano
>> addon?
>>
>> I think so. Comet is mostly orthogonal to the Spark scheduler framework.
>>
>> Chao
>>
>>
>>
>>
>>
>>
>> On Fri, Feb 16, 2024 at 5:39 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Chao,
>>>
>>> As a cool feature
>>>
>>>
>>>- Compared to standard Spark, what kind of performance gains can be
>>>expected with Comet?
>>>-  Can one use Comet on k8s in conjunction with something like a
>>>Volcano addon?
>>>
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge, sourced from both personal expertise and other resources but of
>>> course cannot be guaranteed . It is essential to note that, as with any
>>> advice, one verified and tested result holds more weight than a thousand
>>> expert opinions.
>>>
>>>
>>> On Tue, 13 Feb 2024 at 20:42, Chao Sun  wrote:
>>>
 Hi all,

 We are very happy to announce that Project Comet, a plugin to
 accelerate Spark query execution via leveraging DataFusion and Arrow,
 has now been open sourced under the Apache Arrow umbrella. Please
 check the project repo
 https://github.com/apache/arrow-datafusion-comet for more details if
 you are interested. We'd love to collaborate with people from the open
 source community who share similar goals.

 Thanks,
 Chao

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: unsubscribe

2024-02-20 Thread kritika jain
Unsubscribe

On Tue, 20 Feb 2024, 3:18 pm Крюков Виталий Семенович,
 wrote:

>
> unsubscribe
>
>
>


unsubscribe

2024-02-20 Thread Крюков Виталий Семенович

unsubscribe




Community Over Code Asia 2024 Travel Assistance Applications now open!

2024-02-20 Thread Gavin McDonald
Hello to all users, contributors and Committers!

The Travel Assistance Committee (TAC) are pleased to announce that
travel assistance applications for Community over Code Asia 2024 are now
open!

We will be supporting Community over Code Asia, Hangzhou, China
July 26th - 28th, 2024.

TAC exists to help those that would like to attend Community over Code
events, but are unable to do so for financial reasons. For more info
on this year's applications and qualifying criteria, please visit the
TAC website at < https://tac.apache.org/ >. Applications are already
open on https://tac-apply.apache.org/, so don't delay!

The Apache Travel Assistance Committee will only be accepting
applications from those people that are able to attend the full event.

Important: Applications close on Friday, May 10th, 2024.

Applicants have until the the closing date above to submit their
applications (which should contain as much supporting material as
required to efficiently and accurately process their request), this
will enable TAC to announce successful applications shortly
afterwards.

As usual, TAC expects to deal with a range of applications from a
diverse range of backgrounds; therefore, we encourage (as always)
anyone thinking about sending in an application to do so ASAP.

For those that will need a Visa to enter the Country - we advise you to
apply
now so that you have enough time in case of interview delays. So do not
wait until you know if you have been accepted or not.

We look forward to greeting many of you in Hangzhou, China in July, 2024!

Kind Regards,

Gavin

(On behalf of the Travel Assistance Committee)


Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Mich Talebzadeh
Thanks for your kind words Sri

Well it is true that as yet spark on kubernetes is not on-par with spark on
YARN in maturity and essentially spark on kubernetes is still work in
progress.* So in the first place IMO one needs to think why executors are
failing. What causes this behaviour? Is it the code or some inadequate
set-up? *These things come to my mind


   - Resource Allocation: Insufficient resources (CPU, memory) can lead to
   executor failures.
   - Mis-configuration Issues: Verify that the configurations are
   appropriate for your workload.
   - External Dependencies: If your Spark job relies on external services
   or data sources, ensure they are accessible. Issues such as network
   problems or unavailability of external services can lead to executor
   failures.
   - Data Skew: Uneven distribution of data across partitions can lead to
   data skew and cause some executors to process significantly more data than
   others. This can lead to resource exhaustion on specific executors.
   - Spark Version and Kubernetes Compatibility: Is Spark running on EKS or
   GKE -- that you are using a Spark version that is compatible with your
   Kubernetes environment. These vendors normally run older, more stable
   versions of Spark. Compatibility issues can arise when using your newer
   version of Spark.
   - How up-to-date are your docker images on container registries (ECR,
   GCR).Is there any incompatibility between docker images built on a Spark
   version and the host spark version you are submitting your spark-submit
   from?

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 23:18, Sri Potluri  wrote:

> Dear Mich,
>
> Thank you for your detailed response and the suggested approach to
> handling retry logic. I appreciate you taking the time to outline the
> method of embedding custom retry mechanisms directly into the application
> code.
>
> While the solution of wrapping the main logic of the Spark job in a loop
> for controlling the number of retries is technically sound and offers a
> workaround, it may not be the most efficient or maintainable solution for
> organizations running a large number of Spark jobs. Modifying each
> application to include custom retry logic can be a significant undertaking,
> introducing variability in how retries are handled across different jobs,
> and require additional testing and maintenance.
>
> Ideally, operational concerns like retry behavior in response to
> infrastructure failures should be decoupled from the business logic of
> Spark applications. This separation allows data engineers and scientists to
> focus on the application logic without needing to implement and test
> infrastructure resilience mechanisms.
>
> Thank you again for your time and assistance.
>
> Best regards,
> Sri Potluri
>
> On Mon, Feb 19, 2024 at 5:03 PM Mich Talebzadeh 
> wrote:
>
>> Went through your issue with the code running on k8s
>>
>> When an executor of a Spark application fails, the system attempts to
>> maintain the desired level of parallelism by automatically recreating a new
>> executor to replace the failed one. While this behavior is beneficial for
>> transient errors, ensuring that the application continues to run, it
>> becomes problematic in cases where the failure is due to a persistent issue
>> (such as misconfiguration, inaccessible external resources, or incompatible
>> environment settings). In such scenarios, the application enters a loop,
>> continuously trying to recreate executors, which leads to resource wastage
>> and complicates application management.
>>
>> Well fault tolerance is built especially in k8s cluster. You can
>> implement your own logic to control the retry attempts. You can do this
>> by wrapping the main logic of your Spark job in a loop and controlling the
>> number of retries. If a persistent issue is detected, you can choose to
>> stop the job. Today is the third time that looping control has come up :)
>>
>> Take this code
>>
>> import time
>> max_retries = 5 retries = 0 while retries < max_retries: try: # Your
>> Spark job logic here except Exception as e: # Log the exception
>> print(f"Exception in Spark job: {str(e)}") # Increment the retry count
>> retries += 1 # Sleep time.sleep(60) else: # Break out of the loop if the
>> job completes successfully break
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>> 

Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Cheng Pan
Spark has supported the window-based executor failure-tracking mechanism for 
YARN for a long time, SPARK-41210[1][2] (included in 3.5.0) extended this 
feature to K8s.

[1] https://issues.apache.org/jira/browse/SPARK-41210
[2] https://github.com/apache/spark/pull/38732

Thanks,
Cheng Pan


> On Feb 19, 2024, at 23:59, Sri Potluri  wrote:
> 
> Hello Spark Community,
> 
> I am currently leveraging Spark on Kubernetes, managed by the Spark Operator, 
> for running various Spark applications. While the system generally works 
> well, I've encountered a challenge related to how Spark applications handle 
> executor failures, specifically in scenarios where executors enter an error 
> state due to persistent issues.
> 
> Problem Description
> 
> When an executor of a Spark application fails, the system attempts to 
> maintain the desired level of parallelism by automatically recreating a new 
> executor to replace the failed one. While this behavior is beneficial for 
> transient errors, ensuring that the application continues to run, it becomes 
> problematic in cases where the failure is due to a persistent issue (such as 
> misconfiguration, inaccessible external resources, or incompatible 
> environment settings). In such scenarios, the application enters a loop, 
> continuously trying to recreate executors, which leads to resource wastage 
> and complicates application management.
> 
> Desired Behavior
> 
> Ideally, I would like to have a mechanism to limit the number of retries for 
> executor recreation. If the system fails to successfully create an executor 
> more than a specified number of times (e.g., 5 attempts), the entire Spark 
> application should fail and stop trying to recreate the executor. This 
> behavior would help in efficiently managing resources and avoiding prolonged 
> failure states.
> 
> Questions for the Community
> 
> 1. Is there an existing configuration or method within Spark or the Spark 
> Operator to limit executor recreation attempts and fail the job after 
> reaching a threshold?
>
> 2. Has anyone else encountered similar challenges and found workarounds or 
> solutions that could be applied in this context?
> 
> 
> Additional Context
> 
> I have explored Spark's task and stage retry configurations 
> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these 
> do not directly address the issue of limiting executor creation retries. 
> Implementing a custom monitoring solution to track executor failures and 
> manually stop the application is a potential workaround, but it would be 
> preferable to have a more integrated solution.
> 
> I appreciate any guidance, insights, or feedback you can provide on this 
> matter.
> 
> Thank you for your time and support.
> 
> Best regards,
> Sri P


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Sri Potluri
Dear Mich,

Thank you for your detailed response and the suggested approach to handling
retry logic. I appreciate you taking the time to outline the method of
embedding custom retry mechanisms directly into the application code.

While the solution of wrapping the main logic of the Spark job in a loop
for controlling the number of retries is technically sound and offers a
workaround, it may not be the most efficient or maintainable solution for
organizations running a large number of Spark jobs. Modifying each
application to include custom retry logic can be a significant undertaking,
introducing variability in how retries are handled across different jobs,
and require additional testing and maintenance.

Ideally, operational concerns like retry behavior in response to
infrastructure failures should be decoupled from the business logic of
Spark applications. This separation allows data engineers and scientists to
focus on the application logic without needing to implement and test
infrastructure resilience mechanisms.

Thank you again for your time and assistance.

Best regards,
Sri Potluri

On Mon, Feb 19, 2024 at 5:03 PM Mich Talebzadeh 
wrote:

> Went through your issue with the code running on k8s
>
> When an executor of a Spark application fails, the system attempts to
> maintain the desired level of parallelism by automatically recreating a new
> executor to replace the failed one. While this behavior is beneficial for
> transient errors, ensuring that the application continues to run, it
> becomes problematic in cases where the failure is due to a persistent issue
> (such as misconfiguration, inaccessible external resources, or incompatible
> environment settings). In such scenarios, the application enters a loop,
> continuously trying to recreate executors, which leads to resource wastage
> and complicates application management.
>
> Well fault tolerance is built especially in k8s cluster. You can implement 
> your
> own logic to control the retry attempts. You can do this by wrapping the
> main logic of your Spark job in a loop and controlling the number of
> retries. If a persistent issue is detected, you can choose to stop the job.
> Today is the third time that looping control has come up :)
>
> Take this code
>
> import time
> max_retries = 5 retries = 0 while retries < max_retries: try: # Your Spark
> job logic here except Exception as e: # Log the exception print(f"Exception
> in Spark job: {str(e)}") # Increment the retry count retries += 1 # Sleep
> time.sleep(60) else: # Break out of the loop if the job completes
> successfully break
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 19 Feb 2024 at 19:21, Mich Talebzadeh 
> wrote:
>
>> Not that I am aware of any configuration parameter in Spark classic to
>> limit executor creation. Because of fault tolerance Spark will try to
>> recreate failed executors. Not really that familiar with the Spark operator
>> for k8s. There may be something there.
>>
>> Have you considered custom monitoring and handling within Spark itself
>> using max_retries = 5  etc?
>>
>> HTH
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 19 Feb 2024 at 18:34, Sri Potluri  wrote:
>>
>>> Hello Spark Community,
>>>
>>> I am currently leveraging Spark on Kubernetes, managed by the Spark
>>> Operator, for running various Spark applications. While the system
>>> generally works well, I've encountered a challenge related to how Spark
>>> applications handle executor failures, specifically in scenarios where
>>> executors enter an error state due to persistent issues.
>>>
>>> *Problem Description*
>>>
>>> When an executor of a Spark application fails, the system attempts to
>>> maintain the desired level of parallelism by automatically recreating a new
>>> executor to replace the failed one. While this 

Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Mich Talebzadeh
Went through your issue with the code running on k8s

When an executor of a Spark application fails, the system attempts to
maintain the desired level of parallelism by automatically recreating a new
executor to replace the failed one. While this behavior is beneficial for
transient errors, ensuring that the application continues to run, it
becomes problematic in cases where the failure is due to a persistent issue
(such as misconfiguration, inaccessible external resources, or incompatible
environment settings). In such scenarios, the application enters a loop,
continuously trying to recreate executors, which leads to resource wastage
and complicates application management.

Well fault tolerance is built especially in k8s cluster. You can implement your
own logic to control the retry attempts. You can do this by wrapping the
main logic of your Spark job in a loop and controlling the number of
retries. If a persistent issue is detected, you can choose to stop the job.
Today is the third time that looping control has come up :)

Take this code

import time
max_retries = 5 retries = 0 while retries < max_retries: try: # Your Spark
job logic here except Exception as e: # Log the exception print(f"Exception
in Spark job: {str(e)}") # Increment the retry count retries += 1 # Sleep
time.sleep(60) else: # Break out of the loop if the job completes
successfully break

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 19:21, Mich Talebzadeh 
wrote:

> Not that I am aware of any configuration parameter in Spark classic to
> limit executor creation. Because of fault tolerance Spark will try to
> recreate failed executors. Not really that familiar with the Spark operator
> for k8s. There may be something there.
>
> Have you considered custom monitoring and handling within Spark itself
> using max_retries = 5  etc?
>
> HTH
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 19 Feb 2024 at 18:34, Sri Potluri  wrote:
>
>> Hello Spark Community,
>>
>> I am currently leveraging Spark on Kubernetes, managed by the Spark
>> Operator, for running various Spark applications. While the system
>> generally works well, I've encountered a challenge related to how Spark
>> applications handle executor failures, specifically in scenarios where
>> executors enter an error state due to persistent issues.
>>
>> *Problem Description*
>>
>> When an executor of a Spark application fails, the system attempts to
>> maintain the desired level of parallelism by automatically recreating a new
>> executor to replace the failed one. While this behavior is beneficial for
>> transient errors, ensuring that the application continues to run, it
>> becomes problematic in cases where the failure is due to a persistent issue
>> (such as misconfiguration, inaccessible external resources, or incompatible
>> environment settings). In such scenarios, the application enters a loop,
>> continuously trying to recreate executors, which leads to resource wastage
>> and complicates application management.
>>
>> *Desired Behavior*
>>
>> Ideally, I would like to have a mechanism to limit the number of retries
>> for executor recreation. If the system fails to successfully create an
>> executor more than a specified number of times (e.g., 5 attempts), the
>> entire Spark application should fail and stop trying to recreate the
>> executor. This behavior would help in efficiently managing resources and
>> avoiding prolonged failure states.
>>
>> *Questions for the Community*
>>
>> 1. Is there an existing configuration or method within Spark or the Spark
>> Operator to limit executor recreation attempts and fail the job after
>> reaching a threshold?
>>
>> 2. Has anyone else encountered similar challenges and found workarounds
>> or solutions that could be applied in this context?
>>
>>
>> *Additional Context*
>>
>> I have explored Spark's task and 

Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-19 Thread Mich Talebzadeh
Ok thanks for your clarifications

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 17:24, Chao Sun  wrote:

> Hi Mich,
>
> > Also have you got some benchmark results from your tests that you can
> possibly share?
>
> We only have some partial benchmark results internally so far. Once
> shuffle and better memory management have been introduced, we plan to
> publish the benchmark results (at least TPC-H) in the repo.
>
> > Compared to standard Spark, what kind of performance gains can be
> expected with Comet?
>
> Currently, users could benefit from Comet in a few areas:
> - Parquet read: a few improvements have been made against reading from S3
> in particular, so users can expect better scan performance in this scenario
> - Hash aggregation
> - Columnar shuffle
> - Decimals (Java's BigDecimal is pretty slow)
>
> > Can one use Comet on k8s in conjunction with something like a Volcano
> addon?
>
> I think so. Comet is mostly orthogonal to the Spark scheduler framework.
>
> Chao
>
>
>
>
>
>
> On Fri, Feb 16, 2024 at 5:39 AM Mich Talebzadeh 
> wrote:
>
>> Hi Chao,
>>
>> As a cool feature
>>
>>
>>- Compared to standard Spark, what kind of performance gains can be
>>expected with Comet?
>>-  Can one use Comet on k8s in conjunction with something like a
>>Volcano addon?
>>
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge, sourced from both personal expertise and other resources but of
>> course cannot be guaranteed . It is essential to note that, as with any
>> advice, one verified and tested result holds more weight than a thousand
>> expert opinions.
>>
>>
>> On Tue, 13 Feb 2024 at 20:42, Chao Sun  wrote:
>>
>>> Hi all,
>>>
>>> We are very happy to announce that Project Comet, a plugin to
>>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>>> has now been open sourced under the Apache Arrow umbrella. Please
>>> check the project repo
>>> https://github.com/apache/arrow-datafusion-comet for more details if
>>> you are interested. We'd love to collaborate with people from the open
>>> source community who share similar goals.
>>>
>>> Thanks,
>>> Chao
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Mich Talebzadeh
Not that I am aware of any configuration parameter in Spark classic to
limit executor creation. Because of fault tolerance Spark will try to
recreate failed executors. Not really that familiar with the Spark operator
for k8s. There may be something there.

Have you considered custom monitoring and handling within Spark itself
using max_retries = 5  etc?

HTH

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 18:34, Sri Potluri  wrote:

> Hello Spark Community,
>
> I am currently leveraging Spark on Kubernetes, managed by the Spark
> Operator, for running various Spark applications. While the system
> generally works well, I've encountered a challenge related to how Spark
> applications handle executor failures, specifically in scenarios where
> executors enter an error state due to persistent issues.
>
> *Problem Description*
>
> When an executor of a Spark application fails, the system attempts to
> maintain the desired level of parallelism by automatically recreating a new
> executor to replace the failed one. While this behavior is beneficial for
> transient errors, ensuring that the application continues to run, it
> becomes problematic in cases where the failure is due to a persistent issue
> (such as misconfiguration, inaccessible external resources, or incompatible
> environment settings). In such scenarios, the application enters a loop,
> continuously trying to recreate executors, which leads to resource wastage
> and complicates application management.
>
> *Desired Behavior*
>
> Ideally, I would like to have a mechanism to limit the number of retries
> for executor recreation. If the system fails to successfully create an
> executor more than a specified number of times (e.g., 5 attempts), the
> entire Spark application should fail and stop trying to recreate the
> executor. This behavior would help in efficiently managing resources and
> avoiding prolonged failure states.
>
> *Questions for the Community*
>
> 1. Is there an existing configuration or method within Spark or the Spark
> Operator to limit executor recreation attempts and fail the job after
> reaching a threshold?
>
> 2. Has anyone else encountered similar challenges and found workarounds or
> solutions that could be applied in this context?
>
>
> *Additional Context*
>
> I have explored Spark's task and stage retry configurations
> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
> do not directly address the issue of limiting executor creation retries.
> Implementing a custom monitoring solution to track executor failures and
> manually stop the application is a potential workaround, but it would be
> preferable to have a more integrated solution.
>
> I appreciate any guidance, insights, or feedback you can provide on this
> matter.
>
> Thank you for your time and support.
>
> Best regards,
> Sri P
>


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-19 Thread Chao Sun
Hi Mich,

> Also have you got some benchmark results from your tests that you can
possibly share?

We only have some partial benchmark results internally so far. Once shuffle
and better memory management have been introduced, we plan to publish the
benchmark results (at least TPC-H) in the repo.

> Compared to standard Spark, what kind of performance gains can be
expected with Comet?

Currently, users could benefit from Comet in a few areas:
- Parquet read: a few improvements have been made against reading from S3
in particular, so users can expect better scan performance in this scenario
- Hash aggregation
- Columnar shuffle
- Decimals (Java's BigDecimal is pretty slow)

> Can one use Comet on k8s in conjunction with something like a Volcano
addon?

I think so. Comet is mostly orthogonal to the Spark scheduler framework.

Chao






On Fri, Feb 16, 2024 at 5:39 AM Mich Talebzadeh 
wrote:

> Hi Chao,
>
> As a cool feature
>
>
>- Compared to standard Spark, what kind of performance gains can be
>expected with Comet?
>-  Can one use Comet on k8s in conjunction with something like a
>Volcano addon?
>
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge, sourced from both personal expertise and other resources but of
> course cannot be guaranteed . It is essential to note that, as with any
> advice, one verified and tested result holds more weight than a thousand
> expert opinions.
>
>
> On Tue, 13 Feb 2024 at 20:42, Chao Sun  wrote:
>
>> Hi all,
>>
>> We are very happy to announce that Project Comet, a plugin to
>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>> has now been open sourced under the Apache Arrow umbrella. Please
>> check the project repo
>> https://github.com/apache/arrow-datafusion-comet for more details if
>> you are interested. We'd love to collaborate with people from the open
>> source community who share similar goals.
>>
>> Thanks,
>> Chao
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


[Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Sri Potluri
Hello Spark Community,

I am currently leveraging Spark on Kubernetes, managed by the Spark
Operator, for running various Spark applications. While the system
generally works well, I've encountered a challenge related to how Spark
applications handle executor failures, specifically in scenarios where
executors enter an error state due to persistent issues.

*Problem Description*

When an executor of a Spark application fails, the system attempts to
maintain the desired level of parallelism by automatically recreating a new
executor to replace the failed one. While this behavior is beneficial for
transient errors, ensuring that the application continues to run, it
becomes problematic in cases where the failure is due to a persistent issue
(such as misconfiguration, inaccessible external resources, or incompatible
environment settings). In such scenarios, the application enters a loop,
continuously trying to recreate executors, which leads to resource wastage
and complicates application management.

*Desired Behavior*

Ideally, I would like to have a mechanism to limit the number of retries
for executor recreation. If the system fails to successfully create an
executor more than a specified number of times (e.g., 5 attempts), the
entire Spark application should fail and stop trying to recreate the
executor. This behavior would help in efficiently managing resources and
avoiding prolonged failure states.

*Questions for the Community*

1. Is there an existing configuration or method within Spark or the Spark
Operator to limit executor recreation attempts and fail the job after
reaching a threshold?

2. Has anyone else encountered similar challenges and found workarounds or
solutions that could be applied in this context?


*Additional Context*

I have explored Spark's task and stage retry configurations
(`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
do not directly address the issue of limiting executor creation retries.
Implementing a custom monitoring solution to track executor failures and
manually stop the application is a potential workaround, but it would be
preferable to have a more integrated solution.

I appreciate any guidance, insights, or feedback you can provide on this
matter.

Thank you for your time and support.

Best regards,
Sri P


Re: Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Jagannath Majhi
Yes I have gone through it. So let's give me the setup. More context - My
jar file is in java language

On Mon, Feb 19, 2024, 8:53 PM Mich Talebzadeh 
wrote:

> Sure but first it would be beneficial to understand the way Spark works on
> Kubernetes and the concept.s
>
> Have a look at this article of mine
>
> Spark on Kubernetes, A Practitioner’s Guide
> 
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 19 Feb 2024 at 15:09, Jagannath Majhi <
> jagannath.ma...@cloud.cbnits.com> wrote:
>
>> Yes
>>
>> On Mon, Feb 19, 2024, 8:35 PM Mich Talebzadeh 
>> wrote:
>>
>>> OK you have a jar file that you want to work with when running using
>>> Spark on k8s as the execution engine (EKS) as opposed to  YARN on EMR as
>>> the execution engine?
>>>
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Mon, 19 Feb 2024 at 14:38, Jagannath Majhi <
>>> jagannath.ma...@cloud.cbnits.com> wrote:
>>>
 I am not using any private docker image. Only I am running the jar file
 in EMR using spark-submit command so now I want to run this jar file in eks
 so can you please tell me how can I set-up for this ??

 On Mon, Feb 19, 2024, 8:06 PM Jagannath Majhi <
 jagannath.ma...@cloud.cbnits.com> wrote:

> Can we connect over Google meet??
>
> On Mon, Feb 19, 2024, 8:03 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Where is your docker file? In ECR container registry.
>> If you are going to use EKS, then it need to be accessible to all
>> nodes of cluster
>>
>> When you build your docker image, put your jar under the $SPARK_HOME
>> directory. Then add a line to your docker build file as below
>> Here I am accessing Google BigQuery DW from EKS
>> # Add a BigQuery connector jar.
>> ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
>> ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
>> RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \
>> && chown spark:spark "${SPARK_EXTRA_JARS_DIR}"
>> COPY --chown=spark:spark \
>> spark-bigquery-with-dependencies_2.12-0.22.2.jar
>> "${SPARK_EXTRA_JARS_DIR}"
>>
>> Here I am accessing Google BigQuery DW from EKS cluster
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi <
>> jagannath.ma...@cloud.cbnits.com> wrote:
>>
>>> Dear Spark Community,
>>>
>>> I hope this email finds you well. I am reaching out to seek
>>> assistance and guidance regarding a task I'm currently working on 
>>> involving
>>> Apache Spark.
>>>
>>> I have developed a JAR file that contains some Spark applications
>>> and functionality, and I need to run this JAR file within a Spark 
>>> cluster.
>>> However, the JAR file is located in an AWS S3 bucket. I'm facing some
>>> challenges in configuring Spark to access and execute this JAR file
>>> directly from the S3 

Re: Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Jagannath Majhi
I am not using any private docker image. Only I am running the jar file in
EMR using spark-submit command so now I want to run this jar file in eks so
can you please tell me how can I set-up for this ??

On Mon, Feb 19, 2024, 8:06 PM Jagannath Majhi <
jagannath.ma...@cloud.cbnits.com> wrote:

> Can we connect over Google meet??
>
> On Mon, Feb 19, 2024, 8:03 PM Mich Talebzadeh 
> wrote:
>
>> Where is your docker file? In ECR container registry.
>> If you are going to use EKS, then it need to be accessible to all nodes
>> of cluster
>>
>> When you build your docker image, put your jar under the $SPARK_HOME
>> directory. Then add a line to your docker build file as below
>> Here I am accessing Google BigQuery DW from EKS
>> # Add a BigQuery connector jar.
>> ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
>> ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
>> RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \
>> && chown spark:spark "${SPARK_EXTRA_JARS_DIR}"
>> COPY --chown=spark:spark \
>> spark-bigquery-with-dependencies_2.12-0.22.2.jar
>> "${SPARK_EXTRA_JARS_DIR}"
>>
>> Here I am accessing Google BigQuery DW from EKS cluster
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi <
>> jagannath.ma...@cloud.cbnits.com> wrote:
>>
>>> Dear Spark Community,
>>>
>>> I hope this email finds you well. I am reaching out to seek assistance
>>> and guidance regarding a task I'm currently working on involving Apache
>>> Spark.
>>>
>>> I have developed a JAR file that contains some Spark applications and
>>> functionality, and I need to run this JAR file within a Spark cluster.
>>> However, the JAR file is located in an AWS S3 bucket. I'm facing some
>>> challenges in configuring Spark to access and execute this JAR file
>>> directly from the S3 bucket.
>>>
>>> I would greatly appreciate any advice, best practices, or pointers on
>>> how to achieve this integration effectively. Specifically, I'm looking for
>>> insights on:
>>>
>>>1. Configuring Spark to access and retrieve the JAR file from an AWS
>>>S3 bucket.
>>>2. Setting up the necessary permissions and authentication
>>>mechanisms to ensure seamless access to the S3 bucket.
>>>3. Any potential performance considerations or optimizations when
>>>running Spark applications with dependencies stored in remote storage 
>>> like
>>>AWS S3.
>>>
>>> If anyone in the community has prior experience or knowledge in this
>>> area, I would be extremely grateful for your guidance. Additionally, if
>>> there are any relevant resources, documentation, or tutorials that you
>>> could recommend, it would be incredibly helpful.
>>>
>>> Thank you very much for considering my request. I look forward to
>>> hearing from you and benefiting from the collective expertise of the Spark
>>> community.
>>>
>>> Best regards, Jagannath Majhi
>>>
>>


Re: Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Mich Talebzadeh
Sure but first it would be beneficial to understand the way Spark works on
Kubernetes and the concept.s

Have a look at this article of mine

Spark on Kubernetes, A Practitioner’s Guide


HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 15:09, Jagannath Majhi <
jagannath.ma...@cloud.cbnits.com> wrote:

> Yes
>
> On Mon, Feb 19, 2024, 8:35 PM Mich Talebzadeh 
> wrote:
>
>> OK you have a jar file that you want to work with when running using
>> Spark on k8s as the execution engine (EKS) as opposed to  YARN on EMR as
>> the execution engine?
>>
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 19 Feb 2024 at 14:38, Jagannath Majhi <
>> jagannath.ma...@cloud.cbnits.com> wrote:
>>
>>> I am not using any private docker image. Only I am running the jar file
>>> in EMR using spark-submit command so now I want to run this jar file in eks
>>> so can you please tell me how can I set-up for this ??
>>>
>>> On Mon, Feb 19, 2024, 8:06 PM Jagannath Majhi <
>>> jagannath.ma...@cloud.cbnits.com> wrote:
>>>
 Can we connect over Google meet??

 On Mon, Feb 19, 2024, 8:03 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Where is your docker file? In ECR container registry.
> If you are going to use EKS, then it need to be accessible to all
> nodes of cluster
>
> When you build your docker image, put your jar under the $SPARK_HOME
> directory. Then add a line to your docker build file as below
> Here I am accessing Google BigQuery DW from EKS
> # Add a BigQuery connector jar.
> ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
> ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
> RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \
> && chown spark:spark "${SPARK_EXTRA_JARS_DIR}"
> COPY --chown=spark:spark \
> spark-bigquery-with-dependencies_2.12-0.22.2.jar
> "${SPARK_EXTRA_JARS_DIR}"
>
> Here I am accessing Google BigQuery DW from EKS cluster
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner
> Von Braun
> )".
>
>
> On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi <
> jagannath.ma...@cloud.cbnits.com> wrote:
>
>> Dear Spark Community,
>>
>> I hope this email finds you well. I am reaching out to seek
>> assistance and guidance regarding a task I'm currently working on 
>> involving
>> Apache Spark.
>>
>> I have developed a JAR file that contains some Spark applications and
>> functionality, and I need to run this JAR file within a Spark cluster.
>> However, the JAR file is located in an AWS S3 bucket. I'm facing some
>> challenges in configuring Spark to access and execute this JAR file
>> directly from the S3 bucket.
>>
>> I would greatly appreciate any advice, best practices, or pointers on
>> how to achieve this integration effectively. Specifically, I'm looking 
>> for
>> insights on:
>>
>>1. Configuring Spark to access and retrieve the JAR file from an
>>AWS S3 bucket.
>>2. Setting 

Re: Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Mich Talebzadeh
OK you have a jar file that you want to work with when running using Spark
on k8s as the execution engine (EKS) as opposed to  YARN on EMR as the
execution engine?


Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 14:38, Jagannath Majhi <
jagannath.ma...@cloud.cbnits.com> wrote:

> I am not using any private docker image. Only I am running the jar file in
> EMR using spark-submit command so now I want to run this jar file in eks so
> can you please tell me how can I set-up for this ??
>
> On Mon, Feb 19, 2024, 8:06 PM Jagannath Majhi <
> jagannath.ma...@cloud.cbnits.com> wrote:
>
>> Can we connect over Google meet??
>>
>> On Mon, Feb 19, 2024, 8:03 PM Mich Talebzadeh 
>> wrote:
>>
>>> Where is your docker file? In ECR container registry.
>>> If you are going to use EKS, then it need to be accessible to all nodes
>>> of cluster
>>>
>>> When you build your docker image, put your jar under the $SPARK_HOME
>>> directory. Then add a line to your docker build file as below
>>> Here I am accessing Google BigQuery DW from EKS
>>> # Add a BigQuery connector jar.
>>> ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
>>> ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
>>> RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \
>>> && chown spark:spark "${SPARK_EXTRA_JARS_DIR}"
>>> COPY --chown=spark:spark \
>>> spark-bigquery-with-dependencies_2.12-0.22.2.jar
>>> "${SPARK_EXTRA_JARS_DIR}"
>>>
>>> Here I am accessing Google BigQuery DW from EKS cluster
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> Von Braun
>>> )".
>>>
>>>
>>> On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi <
>>> jagannath.ma...@cloud.cbnits.com> wrote:
>>>
 Dear Spark Community,

 I hope this email finds you well. I am reaching out to seek assistance
 and guidance regarding a task I'm currently working on involving Apache
 Spark.

 I have developed a JAR file that contains some Spark applications and
 functionality, and I need to run this JAR file within a Spark cluster.
 However, the JAR file is located in an AWS S3 bucket. I'm facing some
 challenges in configuring Spark to access and execute this JAR file
 directly from the S3 bucket.

 I would greatly appreciate any advice, best practices, or pointers on
 how to achieve this integration effectively. Specifically, I'm looking for
 insights on:

1. Configuring Spark to access and retrieve the JAR file from an
AWS S3 bucket.
2. Setting up the necessary permissions and authentication
mechanisms to ensure seamless access to the S3 bucket.
3. Any potential performance considerations or optimizations when
running Spark applications with dependencies stored in remote storage 
 like
AWS S3.

 If anyone in the community has prior experience or knowledge in this
 area, I would be extremely grateful for your guidance. Additionally, if
 there are any relevant resources, documentation, or tutorials that you
 could recommend, it would be incredibly helpful.

 Thank you very much for considering my request. I look forward to
 hearing from you and benefiting from the collective expertise of the Spark
 community.

 Best regards, Jagannath Majhi

>>>


Re: Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Mich Talebzadeh
Where is your docker file? In ECR container registry.
If you are going to use EKS, then it need to be accessible to all nodes of
cluster

When you build your docker image, put your jar under the $SPARK_HOME
directory. Then add a line to your docker build file as below
Here I am accessing Google BigQuery DW from EKS
# Add a BigQuery connector jar.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \
&& chown spark:spark "${SPARK_EXTRA_JARS_DIR}"
COPY --chown=spark:spark \
spark-bigquery-with-dependencies_2.12-0.22.2.jar
"${SPARK_EXTRA_JARS_DIR}"

Here I am accessing Google BigQuery DW from EKS cluster

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi <
jagannath.ma...@cloud.cbnits.com> wrote:

> Dear Spark Community,
>
> I hope this email finds you well. I am reaching out to seek assistance and
> guidance regarding a task I'm currently working on involving Apache Spark.
>
> I have developed a JAR file that contains some Spark applications and
> functionality, and I need to run this JAR file within a Spark cluster.
> However, the JAR file is located in an AWS S3 bucket. I'm facing some
> challenges in configuring Spark to access and execute this JAR file
> directly from the S3 bucket.
>
> I would greatly appreciate any advice, best practices, or pointers on how
> to achieve this integration effectively. Specifically, I'm looking for
> insights on:
>
>1. Configuring Spark to access and retrieve the JAR file from an AWS
>S3 bucket.
>2. Setting up the necessary permissions and authentication mechanisms
>to ensure seamless access to the S3 bucket.
>3. Any potential performance considerations or optimizations when
>running Spark applications with dependencies stored in remote storage like
>AWS S3.
>
> If anyone in the community has prior experience or knowledge in this area,
> I would be extremely grateful for your guidance. Additionally, if there are
> any relevant resources, documentation, or tutorials that you could
> recommend, it would be incredibly helpful.
>
> Thank you very much for considering my request. I look forward to hearing
> from you and benefiting from the collective expertise of the Spark
> community.
>
> Best regards, Jagannath Majhi
>


Re: Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Richard Smith

I run my Spark jobs in GCP with Google Dataproc using GCS buckets.

I've not used AWS, but its EMR product offers similar functionality to 
Dataproc. The title of your post implies your Spark cluster runs on EKS. 
You might be better off using EMR, see links below:


EMR 
https://medium.com/big-data-on-amazon-elastic-mapreduce/run-a-spark-job-within-amazon-emr-in-15-minutes-68b02af1ae16


EKS https://medium.com/@vikas.navlani/running-spark-on-aws-eks-1cd4c31786c

Richard

On 19/02/2024 13:36, Jagannath Majhi wrote:


Dear Spark Community,

I hope this email finds you well. I am reaching out to seek assistance 
and guidance regarding a task I'm currently working on involving 
Apache Spark.


I have developed a JAR file that contains some Spark applications and 
functionality, and I need to run this JAR file within a Spark cluster. 
However, the JAR file is located in an AWS S3 bucket. I'm facing some 
challenges in configuring Spark to access and execute this JAR file 
directly from the S3 bucket.


I would greatly appreciate any advice, best practices, or pointers on 
how to achieve this integration effectively. Specifically, I'm looking 
for insights on:


 1. Configuring Spark to access and retrieve the JAR file from an AWS
S3 bucket.
 2. Setting up the necessary permissions and authentication mechanisms
to ensure seamless access to the S3 bucket.
 3. Any potential performance considerations or optimizations when
running Spark applications with dependencies stored in remote
storage like AWS S3.

If anyone in the community has prior experience or knowledge in this 
area, I would be extremely grateful for your guidance. Additionally, 
if there are any relevant resources, documentation, or tutorials that 
you could recommend, it would be incredibly helpful.


Thank you very much for considering my request. I look forward to 
hearing from you and benefiting from the collective expertise of the 
Spark community.


Best regards, Jagannath Majhi


Regarding Spark on Kubernetes(EKS)

2024-02-19 Thread Jagannath Majhi
Dear Spark Community,

I hope this email finds you well. I am reaching out to seek assistance and
guidance regarding a task I'm currently working on involving Apache Spark.

I have developed a JAR file that contains some Spark applications and
functionality, and I need to run this JAR file within a Spark cluster.
However, the JAR file is located in an AWS S3 bucket. I'm facing some
challenges in configuring Spark to access and execute this JAR file
directly from the S3 bucket.

I would greatly appreciate any advice, best practices, or pointers on how
to achieve this integration effectively. Specifically, I'm looking for
insights on:

   1. Configuring Spark to access and retrieve the JAR file from an AWS S3
   bucket.
   2. Setting up the necessary permissions and authentication mechanisms to
   ensure seamless access to the S3 bucket.
   3. Any potential performance considerations or optimizations when
   running Spark applications with dependencies stored in remote storage like
   AWS S3.

If anyone in the community has prior experience or knowledge in this area,
I would be extremely grateful for your guidance. Additionally, if there are
any relevant resources, documentation, or tutorials that you could
recommend, it would be incredibly helpful.

Thank you very much for considering my request. I look forward to hearing
from you and benefiting from the collective expertise of the Spark
community.

Best regards, Jagannath Majhi


Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-19 Thread Mich Talebzadeh
OK got it

Someone asked a similar but not related to shuffle question  in Spark slack
channel.. This is a simple Python code that creates shuffle files in
shuffle_directory = "/tmp/spark_shuffles" and simulates  working examples
using a loop and periodically cleans up shuffle files older than 1 second..
Take it for a spin

import os
import glob
import time
from datetime import datetime, timedelta
import shutil
from pyspark.sql import SparkSession

def generate_shuffle_data(spark, shuffle_directory):
# Generate some micky mouse data
data = [("A", 1), ("B", 2), ("A", 3), ("C", 4), ("B", 5)]
columns = ["column_to_check", "partition_column"]
df = spark.createDataFrame(data, columns)
df.printSchema()

# Write DataFrame with shuffle to the specified output path

df.write.mode("overwrite").partitionBy("partition_column").parquet(shuffle_directory)

def simulate_long_lived_spark_app():
shuffle_directory = "/tmp/spark_shuffles"

# Remove the directory if it exists
if os.path.exists(shuffle_directory):
shutil.rmtree(shuffle_directory)

# Create the directory
os.makedirs(shuffle_directory)

spark =
SparkSession.builder.appName("shuffleCleanupExample").getOrCreate()

try:
for iteration in range(1, 6):  # Simulating 5 iterations of the
long-lived Spark app
print(f"Iteration {iteration}")

# Generate and write shuffle data
generate_shuffle_data(spark, shuffle_directory)

# Perform some Spark operations (simulated processing)
#  your code

# Periodically clean up shuffle files older than 1 second
try:
cleanup_unnecessary_shuffles(shuffle_directory,
max_age_seconds=1)
print("Shuffle cleanup successful.")
except Exception as e:
print(f"Error during shuffle cleanup: {str(e)}")

# Simulate some delay between iterations
time.sleep(2)

finally:
# Stop the Spark session
spark.stop()

def cleanup_unnecessary_shuffles(shuffle_directory, max_age_seconds):
current_time = datetime.now()

# Iterate through shuffle files in the directory
for shuffle_file in os.listdir(shuffle_directory):
shuffle_file_path = os.path.join(shuffle_directory, shuffle_file)

# Check if it's a file and not a directory
if os.path.isfile(shuffle_file_path):
# Get the creation time of the file
file_creation_time =
datetime.fromtimestamp(os.path.getctime(shuffle_file_path))

# Calculate the age of the file in seconds
age_seconds = (current_time -
file_creation_time).total_seconds()

try:
# Check if the file is older than the specified
max_age_seconds
if age_seconds > max_age_seconds:
# Perform cleanup (delete the file)
os.remove(shuffle_file_path)
print(f"Deleted old shuffle file: {shuffle_file_path}")

except Exception as e:
print(f"Error during cleanup: {str(e)}")

# Run the simulation
simulate_long_lived_spark_app()

.and the output

Iteration 1
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 2
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 3
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 4
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 5
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 19 Feb 2024 at 08:27, Saha, Daniel  wrote:

> Thanks for the suggestions Mich, Jörn, and Adam.
>
>
>
> The rationale for long-lived app with loop versus submitting multiple yarn
> applications is mainly for simplicity. Plan to run app on an multi-tenant
> EMR cluster alongside other yarn apps. Implementing the loop outside the
> Spark app will work but introduces more complexity compared to single
> long-lived Spark app with dynamic allocation + min executors. Specifically,
>
>- Introduce 

Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-19 Thread Saha, Daniel
Thanks for the suggestions Mich, Jörn, and Adam.

The rationale for long-lived app with loop versus submitting multiple yarn 
applications is mainly for simplicity. Plan to run app on an multi-tenant EMR 
cluster alongside other yarn apps. Implementing the loop outside the Spark app 
will work but introduces more complexity compared to single long-lived Spark 
app with dynamic allocation + min executors. Specifically,

  *   Introduce component that submits an EMR step to run `spark-submit`
  *   Define YARN queue for my app such resources are reserved and other 
tenants will not prevent my app from entering RUNNING state
  *   Determine whether the previous YARN app is FINISHED (or just submit a 
bunch of apps up front and rely on yarn SUBMITTED/ACCEPTED states)

So I really was hoping for being able to recreate the Spark Context, or at 
least find some way to trigger a clean of the DiskBlockManager in between loop 
iterations. If no way to do this, I will test performance of cloud based 
shuffle. This might be better for cost savings too (S3 vs. EBS) and allow me to 
use smaller instances too (I was using beefy instances and beefy executors to 
improve shuffle locality).

To the other points:

  1.  Dynamic allocation is enabled suspect not the issue here. Enabling  
`spark.shuffle.service.removeShuffle`  didn’t seem to help much – likely 
because executors are not being decommissioned often due to nature of the tight 
loop and the fact executor timeout was already raised from 60s default to 300s.
  2.  Cloud shuffle + S3 lifecycle policy or brute force/cron removing files 
will for sure work but looking for something more “elegant”
  3.  Shuffle data should be deleted after it’s no longer needed • From my 
understanding of the spark codebase the only time the DiskBlockManager cleans 
the `spark.local.dir` directory [1] is when stop() is called – which only 
happens when the SparkEnv is stopped [2].
  4.  Suspect spilled data is not what’s filling up disk since app barely 
spills to disk [3]. Also supporting this hypothesis was that raising 
`spark.shuffle.sort.bypassMergeThreshold` to above the max reducer partitions 
significantly slowed the rate of disk usage
  5.
Daniel

[1] 
https://github.com/apache/spark/blob/8f5a647b0bbb6e83ee484091d3422b24baea5a80/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L369
[2] 
https://github.com/apache/spark/blob/c4e4497ff7e747eb71d087cdfb1b51673c53b83b/core/src/main/scala/org/apache/spark/SparkEnv.scala#L112
[3] Was able to eliminate most of the skew during repartitionByRange by 
dynamically salting keys using the results of df.stat.countMinSketch


From: Mich Talebzadeh 
Date: Sunday, February 18, 2024 at 1:38 AM
Cc: "user@spark.apache.org" 
Subject: RE: [EXTERNAL] Re-create SparkContext of SparkSession inside 
long-lived Spark app


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

What do you propose or you think will help when these spark jobs are 
independent of each other --> So once a job/iterator is complete, there is no 
need to retain these shuffle files. You have a number of options to consider 
starting from spark configuration parameters and so forth

https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior

Aside, have you turned on dynamic resource allocation and the relevant 
parameters. Can you up executor memory -> spark.storage.,memoryFraction and 
spark.shuffle.spillThreshold as well? You can of course use brute force with 
shutil.rmtree(path) to remove these files.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, one verified and tested result holds more weight than a thousand expert 
opinions.


On Sat, 17 Feb 2024 at 23:40, Saha, Daniel  wrote:
Hi,

Background: I am running into executor disk space issues when running a 
long-lived Spark 3.3 app with YARN on AWS EMR. The app performs back-to-back 
spark jobs in a sequential loop with each iteration performing 100gb+ shuffles. 
The files taking up the space are related to shuffle blocks [1]. Disk is only 
cleared when restarting the YARN app. For all intents and purposes, each job is 
independent. So once a job/iterator is complete, there is no need to retain 
these shuffle files. I want to try stopping and recreating the Spark context 
between loop iterations/jobs to indicate to Spark DiskBlockManager that these 
intermediate results are no longer needed [2].


Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-18 Thread Mich Talebzadeh
Hi,

What do you propose or you think will help when these spark jobs are
independent of each other --> So once a job/iterator is complete, there is
no need to retain these shuffle files. You have a number of options to
consider starting from spark configuration parameters and so forth

https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior

Aside, have you turned on dynamic resource allocation and the relevant
parameters. Can you up executor memory -> spark.storage.,memoryFraction
and spark.shuffle.spillThreshold as well? You can of course use brute force
with shutil.rmtree(path) to remove these files.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, one verified and tested result holds more weight
than a thousand expert opinions.


On Sat, 17 Feb 2024 at 23:40, Saha, Daniel 
wrote:

> Hi,
>
>
>
> *Background*: I am running into executor disk space issues when running a
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs
> back-to-back spark jobs in a sequential loop with each iteration performing
> 100gb+ shuffles. The files taking up the space are related to shuffle
> blocks [1]. Disk is only cleared when restarting the YARN app. For all
> intents and purposes, each job is independent. So once a job/iterator is
> complete, there is no need to retain these shuffle files. I want to try
> stopping and recreating the Spark context between loop iterations/jobs to
> indicate to Spark DiskBlockManager that these intermediate results are no
> longer needed [2].
>
>
>
> *Questions*:
>
>- Are there better ways to remove/clean the directory containing these
>old, no longer used, shuffle results (aside from cron or restarting yarn
>app)?
>- How to recreate the spark context within a single application? I see
>no methods in Spark Session for doing this, and each new Spark session
>re-uses the existing spark context. After stopping the SparkContext,
>SparkSession does not re-create a new one. Further, creating a new
>SparkSession via constructor and passing in a new SparkContext is not
>allowed as it is a protected/private method.
>
>
>
> Thanks
>
> Daniel
>
>
>
> [1]
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
>
> [2] https://stackoverflow.com/a/38791921
>


Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-17 Thread Jörn Franke
You can try to shuffle to s3 using the cloud shuffle plugin for s3 
(https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/)
 - the performance of the new plugin is for many spark jobs sufficient (it 
works also on EMR). Then you can use s3 lifecycle policies to clean up/expire 
objects older than one day 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html)
 - this then also cleans up files from crashed spark jobs.

For shuffle on disk you have not much choices as you mentioned. I would though 
avoid to have a long living app that loops - that never works so well on Spark 
(it is designed for batch jobs that eventually stop). Maybe you can simply 
trigger a new job when a new file arrives (s3 events ?).

> Am 18.02.2024 um 00:39 schrieb Saha, Daniel :
> 
> 
> Hi,
>  
> Background: I am running into executor disk space issues when running a 
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs back-to-back 
> spark jobs in a sequential loop with each iteration performing 100gb+ 
> shuffles. The files taking up the space are related to shuffle blocks [1]. 
> Disk is only cleared when restarting the YARN app. For all intents and 
> purposes, each job is independent. So once a job/iterator is complete, there 
> is no need to retain these shuffle files. I want to try stopping and 
> recreating the Spark context between loop iterations/jobs to indicate to 
> Spark DiskBlockManager that these intermediate results are no longer needed 
> [2].
>  
> Questions:
> Are there better ways to remove/clean the directory containing these old, no 
> longer used, shuffle results (aside from cron or restarting yarn app)?
> How to recreate the spark context within a single application? I see no 
> methods in Spark Session for doing this, and each new Spark session re-uses 
> the existing spark context. After stopping the SparkContext, SparkSession 
> does not re-create a new one. Further, creating a new SparkSession via 
> constructor and passing in a new SparkContext is not allowed as it is a 
> protected/private method.
>  
> Thanks
> Daniel
>  
> [1] 
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
> [2] https://stackoverflow.com/a/38791921


Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-17 Thread Adam Binford
If you're using dynamic allocation it could be caused by executors with
shuffle data being deallocated before the shuffle is cleaned up. These
shuffle files will never get cleaned up once that happens until the Yarn
application ends. This was a big issue for us so I added support for
deleting shuffle data via the shuffle service for deallocated executors
that landed in Spark 3.3, but it is disabled by default. See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L698
.

spark.shuffle.service.removeShuffle

If you're not using dynamic allocation then I'm not sure, shuffle data
should be deleted once it's no longer needed (through garbage collection
mechanisms referencing the shuffle). Maybe just make sure any variables
referencing the first DataFrame go out of scope.

Adam

On Sat, Feb 17, 2024 at 6:40 PM Saha, Daniel 
wrote:

> Hi,
>
>
>
> *Background*: I am running into executor disk space issues when running a
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs
> back-to-back spark jobs in a sequential loop with each iteration performing
> 100gb+ shuffles. The files taking up the space are related to shuffle
> blocks [1]. Disk is only cleared when restarting the YARN app. For all
> intents and purposes, each job is independent. So once a job/iterator is
> complete, there is no need to retain these shuffle files. I want to try
> stopping and recreating the Spark context between loop iterations/jobs to
> indicate to Spark DiskBlockManager that these intermediate results are no
> longer needed [2].
>
>
>
> *Questions*:
>
>- Are there better ways to remove/clean the directory containing these
>old, no longer used, shuffle results (aside from cron or restarting yarn
>app)?
>- How to recreate the spark context within a single application? I see
>no methods in Spark Session for doing this, and each new Spark session
>re-uses the existing spark context. After stopping the SparkContext,
>SparkSession does not re-create a new one. Further, creating a new
>SparkSession via constructor and passing in a new SparkContext is not
>allowed as it is a protected/private method.
>
>
>
> Thanks
>
> Daniel
>
>
>
> [1]
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
>
> [2] https://stackoverflow.com/a/38791921
>


-- 
Adam Binford


Re: job uuid not unique

2024-02-16 Thread Mich Talebzadeh
As a bare minimum you will need to add some error trapping and exception
handling!

scala> import org.apache.hadoop.fs.FileAlreadyExistsException
import org.apache.hadoop.fs.FileAlreadyExistsException

and try your code

try {
  df
.coalesce(1)
.write
.option("fs.s3a.committer.require.uuid", "true")
.option("fs.s3a.committer.generate.uuid", "true")
.option("fs.s3a.committer.name", "magic")
.option("fs.s3a.committer.magic.enabled", "true")
.option("orc.compress", "zlib")
.mode(SaveMode.Append)
.orc(path)
} catch {
  case e: FileAlreadyExistsException => println("File already exists.
Handling it...")
  // other catch blocks for the other exceptions?
}

FileAlreadyExistsException allows you to continue without crashing etc.

Another brute force is that instead of *SaveMode.Append*, you can try using
*SaveMode.Overwrite**.* This will overwrite the existing data if it already
exists.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, one verified and tested result holds more weight
than a thousand expert opinions.


On Fri, 16 Feb 2024 at 21:25, Рамик И  wrote:

>
> Hi
> I'm using Spark Streaming to read from Kafka and write to S3. Sometimes I
> get errors when writing org.apache.hadoop.fs.FileAlreadyExistsException.
>
> Spark version: 3.5.0
> scala version : 2.13.8
> Cluster: k8s
>
> libraryDependencies
> org.apache.hadoop.hadoop-aws3.3.4
> com.amazonaws.aws-java-sdk-s31.12.600
>
>
>
> code:
> df
> .coalesce(1)
> .write
> .option("fs.s3a.committer.require.uuid", "true")
>  .option("fs.s3a.committer.generate.uuid", "true")
> .option("fs.s3a.committer.name", "magic")
> .option("fs.s3a.committer.magic.enabled", "true")
>  .option("orc.compress", "zlib")
>  .mode(SaveMode.Append)
> .orc(path)
>
>
>
> executor 9
>
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_00_13217, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13217/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:25 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13217
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13217: duration
> 0:00.061s
> 24/02/16 13:05:25 ERROR Executor: Exception in task 0.2 in stage 367.1
> (TID 13217)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
> executor 10
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_00_13216, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13216/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:24 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13216
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13216: duration
> 0:00.112s
> 24/02/16 13:05:24 ERROR Executor: Exception in task 0.1 in stage 367.1
> (TID 13216)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> 

Effectively append the dataset to avro directory

2024-02-16 Thread Rushikesh Kavar
Hello Community,

I checked below issue in various platforms but I could not get satisfactory
answer.

I am using spark java.
I am having large data cluster.
My application is making more than 10 API calls.
Each calls returns a java list. Each list item is of same structure (i.e.
same java class)

I want to write each list to same avro directory.
For example /data/network_response/

What I am trying is after each API call, I convert list to
org.apache.spark.sql.Dataset object.
Using this snippet:  sqlcontext.createdataframe(results,
class_of_list_entry)

And then write the data to disk
Using below spinnet

records.write().mode(savemode).format(com.databricks.spark.avro).save(folder_path)

Here, savemode is set to overwrite for first API call writing. For
subsequent network call, I set savemode to append.

I want to effectively write all API responses to same directory in avro
format.


Question is :

Is this efficient way?

If I try to append the content of avro, does existing content overwritten
each time?
Where is the documentation for specifications of what is algorithm spark
use to append the content of avro through the way I gave above?
What exactly happens when we append the dataset to existing avro folder?

Shoudl I collect all API datasets to one union dataset  and then write once
the avro content rather than writing avro content to directory after each
network call?

Regards,
Rushikesh.


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-16 Thread Mich Talebzadeh
Hi Chao,

As a cool feature


   - Compared to standard Spark, what kind of performance gains can be
   expected with Comet?
   -  Can one use Comet on k8s in conjunction with something like a Volcano
   addon?


HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge, sourced from both personal expertise and other resources but of
course cannot be guaranteed . It is essential to note that, as with any
advice, one verified and tested result holds more weight than a thousand
expert opinions.


On Tue, 13 Feb 2024 at 20:42, Chao Sun  wrote:

> Hi all,
>
> We are very happy to announce that Project Comet, a plugin to
> accelerate Spark query execution via leveraging DataFusion and Arrow,
> has now been open sourced under the Apache Arrow umbrella. Please
> check the project repo
> https://github.com/apache/arrow-datafusion-comet for more details if
> you are interested. We'd love to collaborate with people from the open
> source community who share similar goals.
>
> Thanks,
> Chao
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-15 Thread Mich Talebzadeh
Hi,I gather from the replies that the plugin is not currently available in
the form expected although I am aware of the shell script.

Also have you got some benchmark results from your tests that you can
possibly share?

Thanks,

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge, sourced from both personal expertise and other resources but of
course cannot be guaranteed . It is essential to note that, as with any
advice, one verified and tested result holds more weight than a thousand
expert opinions.


On Thu, 15 Feb 2024 at 01:18, Chao Sun  wrote:

> Hi Praveen,
>
> We will add a "Getting Started" section in the README soon, but basically
> comet-spark-shell
> 
>  in
> the repo should provide a basic tool to build Comet and launch a Spark
> shell with it.
>
> Note that we haven't open sourced several features yet including shuffle
> support, which the aggregate operation depends on. Please stay tuned!
>
> Chao
>
>
> On Wed, Feb 14, 2024 at 2:44 PM praveen sinha 
> wrote:
>
>> Hi Chao,
>>
>> Is there any example app/gist/repo which can help me use this plugin. I
>> wanted to try out some realtime aggregate performance on top of parquet and
>> spark dataframes.
>>
>> Thanks and Regards
>> Praveen
>>
>>
>> On Wed, Feb 14, 2024 at 9:20 AM Chao Sun  wrote:
>>
>>> > Out of interest what are the differences in the approach between this
>>> and Glutten?
>>>
>>> Overall they are similar, although Gluten supports multiple backends
>>> including Velox and Clickhouse. One major difference is (obviously)
>>> Comet is based on DataFusion and Arrow, and written in Rust, while
>>> Gluten is mostly C++.
>>> I haven't looked very deep into Gluten yet, but there could be other
>>> differences such as how strictly the engine follows Spark's semantics,
>>> table format support (Iceberg, Delta, etc), fallback mechanism
>>> (coarse-grained fallback on stage level or more fine-grained fallback
>>> within stages), UDF support (Comet hasn't started on this yet),
>>> shuffle support, memory management, etc.
>>>
>>> Both engines are backed by very strong and vibrant open source
>>> communities (Velox, Clickhouse, Arrow & DataFusion) so it's very
>>> exciting to see how the projects will grow in future.
>>>
>>> Best,
>>> Chao
>>>
>>> On Tue, Feb 13, 2024 at 10:06 PM John Zhuge  wrote:
>>> >
>>> > Congratulations! Excellent work!
>>> >
>>> > On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:
>>> >>
>>> >> Absolutely thrilled to see the project going open-source! Huge
>>> congrats to Chao and the entire team on this milestone!
>>> >>
>>> >> Yufei
>>> >>
>>> >>
>>> >> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
>>> >>>
>>> >>> Hi all,
>>> >>>
>>> >>> We are very happy to announce that Project Comet, a plugin to
>>> >>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>>> >>> has now been open sourced under the Apache Arrow umbrella. Please
>>> >>> check the project repo
>>> >>> https://github.com/apache/arrow-datafusion-comet for more details if
>>> >>> you are interested. We'd love to collaborate with people from the
>>> open
>>> >>> source community who share similar goals.
>>> >>>
>>> >>> Thanks,
>>> >>> Chao
>>> >>>
>>> >>> -
>>> >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> >>>
>>> >
>>> >
>>> > --
>>> > John Zhuge
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-14 Thread Chao Sun
Hi Praveen,

We will add a "Getting Started" section in the README soon, but basically
comet-spark-shell

in
the repo should provide a basic tool to build Comet and launch a Spark
shell with it.

Note that we haven't open sourced several features yet including shuffle
support, which the aggregate operation depends on. Please stay tuned!

Chao


On Wed, Feb 14, 2024 at 2:44 PM praveen sinha 
wrote:

> Hi Chao,
>
> Is there any example app/gist/repo which can help me use this plugin. I
> wanted to try out some realtime aggregate performance on top of parquet and
> spark dataframes.
>
> Thanks and Regards
> Praveen
>
>
> On Wed, Feb 14, 2024 at 9:20 AM Chao Sun  wrote:
>
>> > Out of interest what are the differences in the approach between this
>> and Glutten?
>>
>> Overall they are similar, although Gluten supports multiple backends
>> including Velox and Clickhouse. One major difference is (obviously)
>> Comet is based on DataFusion and Arrow, and written in Rust, while
>> Gluten is mostly C++.
>> I haven't looked very deep into Gluten yet, but there could be other
>> differences such as how strictly the engine follows Spark's semantics,
>> table format support (Iceberg, Delta, etc), fallback mechanism
>> (coarse-grained fallback on stage level or more fine-grained fallback
>> within stages), UDF support (Comet hasn't started on this yet),
>> shuffle support, memory management, etc.
>>
>> Both engines are backed by very strong and vibrant open source
>> communities (Velox, Clickhouse, Arrow & DataFusion) so it's very
>> exciting to see how the projects will grow in future.
>>
>> Best,
>> Chao
>>
>> On Tue, Feb 13, 2024 at 10:06 PM John Zhuge  wrote:
>> >
>> > Congratulations! Excellent work!
>> >
>> > On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:
>> >>
>> >> Absolutely thrilled to see the project going open-source! Huge
>> congrats to Chao and the entire team on this milestone!
>> >>
>> >> Yufei
>> >>
>> >>
>> >> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
>> >>>
>> >>> Hi all,
>> >>>
>> >>> We are very happy to announce that Project Comet, a plugin to
>> >>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>> >>> has now been open sourced under the Apache Arrow umbrella. Please
>> >>> check the project repo
>> >>> https://github.com/apache/arrow-datafusion-comet for more details if
>> >>> you are interested. We'd love to collaborate with people from the open
>> >>> source community who share similar goals.
>> >>>
>> >>> Thanks,
>> >>> Chao
>> >>>
>> >>> -
>> >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>>
>> >
>> >
>> > --
>> > John Zhuge
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-14 Thread praveen sinha
Hi Chao,

Is there any example app/gist/repo which can help me use this plugin. I
wanted to try out some realtime aggregate performance on top of parquet and
spark dataframes.

Thanks and Regards
Praveen


On Wed, Feb 14, 2024 at 9:20 AM Chao Sun  wrote:

> > Out of interest what are the differences in the approach between this
> and Glutten?
>
> Overall they are similar, although Gluten supports multiple backends
> including Velox and Clickhouse. One major difference is (obviously)
> Comet is based on DataFusion and Arrow, and written in Rust, while
> Gluten is mostly C++.
> I haven't looked very deep into Gluten yet, but there could be other
> differences such as how strictly the engine follows Spark's semantics,
> table format support (Iceberg, Delta, etc), fallback mechanism
> (coarse-grained fallback on stage level or more fine-grained fallback
> within stages), UDF support (Comet hasn't started on this yet),
> shuffle support, memory management, etc.
>
> Both engines are backed by very strong and vibrant open source
> communities (Velox, Clickhouse, Arrow & DataFusion) so it's very
> exciting to see how the projects will grow in future.
>
> Best,
> Chao
>
> On Tue, Feb 13, 2024 at 10:06 PM John Zhuge  wrote:
> >
> > Congratulations! Excellent work!
> >
> > On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:
> >>
> >> Absolutely thrilled to see the project going open-source! Huge congrats
> to Chao and the entire team on this milestone!
> >>
> >> Yufei
> >>
> >>
> >> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
> >>>
> >>> Hi all,
> >>>
> >>> We are very happy to announce that Project Comet, a plugin to
> >>> accelerate Spark query execution via leveraging DataFusion and Arrow,
> >>> has now been open sourced under the Apache Arrow umbrella. Please
> >>> check the project repo
> >>> https://github.com/apache/arrow-datafusion-comet for more details if
> >>> you are interested. We'd love to collaborate with people from the open
> >>> source community who share similar goals.
> >>>
> >>> Thanks,
> >>> Chao
> >>>
> >>> -
> >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>>
> >
> >
> > --
> > John Zhuge
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-14 Thread Chao Sun
> Out of interest what are the differences in the approach between this and 
> Glutten?

Overall they are similar, although Gluten supports multiple backends
including Velox and Clickhouse. One major difference is (obviously)
Comet is based on DataFusion and Arrow, and written in Rust, while
Gluten is mostly C++.
I haven't looked very deep into Gluten yet, but there could be other
differences such as how strictly the engine follows Spark's semantics,
table format support (Iceberg, Delta, etc), fallback mechanism
(coarse-grained fallback on stage level or more fine-grained fallback
within stages), UDF support (Comet hasn't started on this yet),
shuffle support, memory management, etc.

Both engines are backed by very strong and vibrant open source
communities (Velox, Clickhouse, Arrow & DataFusion) so it's very
exciting to see how the projects will grow in future.

Best,
Chao

On Tue, Feb 13, 2024 at 10:06 PM John Zhuge  wrote:
>
> Congratulations! Excellent work!
>
> On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:
>>
>> Absolutely thrilled to see the project going open-source! Huge congrats to 
>> Chao and the entire team on this milestone!
>>
>> Yufei
>>
>>
>> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
>>>
>>> Hi all,
>>>
>>> We are very happy to announce that Project Comet, a plugin to
>>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>>> has now been open sourced under the Apache Arrow umbrella. Please
>>> check the project repo
>>> https://github.com/apache/arrow-datafusion-comet for more details if
>>> you are interested. We'd love to collaborate with people from the open
>>> source community who share similar goals.
>>>
>>> Thanks,
>>> Chao
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>
>
> --
> John Zhuge

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread John Zhuge
Congratulations! Excellent work!

On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:

> Absolutely thrilled to see the project going open-source! Huge congrats to
> Chao and the entire team on this milestone!
>
> Yufei
>
>
> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
>
>> Hi all,
>>
>> We are very happy to announce that Project Comet, a plugin to
>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>> has now been open sourced under the Apache Arrow umbrella. Please
>> check the project repo
>> https://github.com/apache/arrow-datafusion-comet for more details if
>> you are interested. We'd love to collaborate with people from the open
>> source community who share similar goals.
>>
>> Thanks,
>> Chao
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

-- 
John Zhuge


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread Yufei Gu
Absolutely thrilled to see the project going open-source! Huge congrats to
Chao and the entire team on this milestone!

Yufei


On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:

> Hi all,
>
> We are very happy to announce that Project Comet, a plugin to
> accelerate Spark query execution via leveraging DataFusion and Arrow,
> has now been open sourced under the Apache Arrow umbrella. Please
> check the project repo
> https://github.com/apache/arrow-datafusion-comet for more details if
> you are interested. We'd love to collaborate with people from the open
> source community who share similar goals.
>
> Thanks,
> Chao
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-02-13 Thread Mich Talebzadeh
You are getting DiskChecker$DiskErrorExceptionerror when no new records are
published to Kafka for a few days. The error indicates that the Spark
application could not find a valid local directory to create temporary
files for data processing. This mightbe due to any of these
- if no records are published to Kafka for a prolonged period, the S3
partition cleanup logic, enabled by default in S3AFileSystem

, might have removed the temporary directories used for writing batch
data. When
processing resumes, a new temporary directory is needed, but the error
occurs due to insufficient space or permission issues (see below)
- Limited local disk space: ensure sufficient free space on the worker
nodes where Spark executors are running.
- Incorrectly configured spark.local.dir*:* verify that the
spark.local.dir property
in your Spark configuration points to a writable directory with enough
space.
- Permission issues: check directories listed in spark.local.dir are
accessible by the Spark user with read/write permissions.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Feb 2024 at 22:06, Abhishek Singla 
wrote:

> Hi Team,
>
> Could someone provide some insights into this issue?
>
> Regards,
> Abhishek Singla
>
> On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>> Hi Team,
>>
>> Version: 3.2.2
>> Java Version: 1.8.0_211
>> Scala Version: 2.12.15
>> Cluster: Standalone
>>
>> I am using Spark Streaming to read from Kafka and write to S3. The job
>> fails with below error if there are no records published to Kafka for a few
>> days and then there are some records published. Could someone help me in
>> identifying the root cause of this job failure.
>>
>> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
>> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
>> 0919e548-9706-4757-be94-359848100070] terminated with error
>> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
>> valid local directory for s3ablock-0001-
>>  at 
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>>  at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>>  at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>>  at 
>> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>>  at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>>  at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>>  at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>>  at 
>> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>>  at 
>> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>>  at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>>  at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>>  at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>>  at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>>  at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
>>  at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>>  at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>  at scala.Option.getOrElse(Option.scala:189)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>>  at 
>> 

Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread Holden Karau
This looks really cool :) Out of interest what are the differences in the
approach between this and Glutten?

On Tue, Feb 13, 2024 at 12:42 PM Chao Sun  wrote:

> Hi all,
>
> We are very happy to announce that Project Comet, a plugin to
> accelerate Spark query execution via leveraging DataFusion and Arrow,
> has now been open sourced under the Apache Arrow umbrella. Please
> check the project repo
> https://github.com/apache/arrow-datafusion-comet for more details if
> you are interested. We'd love to collaborate with people from the open
> source community who share similar goals.
>
> Thanks,
> Chao
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread Chao Sun
Hi all,

We are very happy to announce that Project Comet, a plugin to
accelerate Spark query execution via leveraging DataFusion and Arrow,
has now been open sourced under the Apache Arrow umbrella. Please
check the project repo
https://github.com/apache/arrow-datafusion-comet for more details if
you are interested. We'd love to collaborate with people from the open
source community who share similar goals.

Thanks,
Chao

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-02-13 Thread Bjørn Jørgensen
DiskChecker$DiskErrorException: Could not find any valid local directory
for s3ablock-0001-

out of space?

tir. 13. feb. 2024 kl. 21:24 skrev Abhishek Singla <
abhisheksingla...@gmail.com>:

> Hi Team,
>
> Could someone provide some insights into this issue?
>
> Regards,
> Abhishek Singla
>
> On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>> Hi Team,
>>
>> Version: 3.2.2
>> Java Version: 1.8.0_211
>> Scala Version: 2.12.15
>> Cluster: Standalone
>>
>> I am using Spark Streaming to read from Kafka and write to S3. The job
>> fails with below error if there are no records published to Kafka for a few
>> days and then there are some records published. Could someone help me in
>> identifying the root cause of this job failure.
>>
>> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
>> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
>> 0919e548-9706-4757-be94-359848100070] terminated with error
>> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
>> valid local directory for s3ablock-0001-
>>  at 
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>>  at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>>  at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>>  at 
>> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>>  at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>>  at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>>  at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>>  at 
>> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>>  at 
>> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>>  at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>>  at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>>  at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>>  at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>>  at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
>>  at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>>  at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>  at scala.Option.getOrElse(Option.scala:189)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
>>  at 
>> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
>>  at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>  at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>  at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>  at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
>>  at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>>  at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>  at 
>> 

Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-02-13 Thread Abhishek Singla
Hi Team,

Could someone provide some insights into this issue?

Regards,
Abhishek Singla

On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
abhisheksingla...@gmail.com> wrote:

> Hi Team,
>
> Version: 3.2.2
> Java Version: 1.8.0_211
> Scala Version: 2.12.15
> Cluster: Standalone
>
> I am using Spark Streaming to read from Kafka and write to S3. The job
> fails with below error if there are no records published to Kafka for a few
> days and then there are some records published. Could someone help me in
> identifying the root cause of this job failure.
>
> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
> 0919e548-9706-4757-be94-359848100070] terminated with error
> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
> valid local directory for s3ablock-0001-
>   at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>   at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>   at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>   at 
> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>   at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>   at 
> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>   at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>   at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>   at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>   at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
>   at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
>   at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>   at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at scala.Option.getOrElse(Option.scala:189)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
>   at 
> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
>   at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   at 

Re: Null pointer exception while replying WAL

2024-02-12 Thread Mich Talebzadeh
OK

Getting Null pointer exception while replying WAL! One possible reason is
that the messages RDD might contain null elements, and attempting to read
JSON from null values can result in an NPE. To handle this, you can add a
filter before processing the RDD to remove null elements.

msgs.foreachRDD { rdd =>
  if (rdd.take(1).nonEmpty) {
val messages: RDD[String] = rdd
  .map { sr =>
Option(sr).getOrElse("NO records found")
  }
  .filter(_ != "NO records found")

try {
  val messagesJson = spark.read.json(messages)
  messagesJson.write.mode("append").parquet(data)
} catch {
  case ex: Exception =>
ex.printStackTrace()
}
  }
}

This modification uses Option *t*o handle potential null values in the rdd
and filters out any elements that are still "NO records found" after the
mapping operation.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 12 Feb 2024 at 14:22, nayan sharma  wrote:

>
> Please find below code
>
>  def main(args: Array[String]): Unit = {
> val config: Config = ConfigFactory.load()
> val streamC = StreamingContext.getOrCreate(
>   checkpointDirectory,
>   () => functionToCreateContext(config, checkpointDirectory)
> )
>
> streamC.start()
> streamC.awaitTermination()
>   }
>
>   def functionToCreateContext(config: Config, checkpointDirectory:
> String): StreamingContext = {
>
> val brokerUrl = config.getString("streaming.solace.brokerURL")
> val username = config.getString("streaming.solace.userName")
> val passwordSol = config.getString("streaming.solace.password")
> val vpn = config.getString("streaming.solace.vpn")
> val queue = config.getString("streaming.solace.queueName")
> val connectionFactory =
> config.getString("streaming.solace.connectionFactory")
>
>
>
> val spark = SparkSession
>   .builder()
>   .appName("rem-Streaming-Consumer")
>   .config("spark.streaming.receiver.writeAheadLog.enable", "true")
>   .config("spark.streaming.blockInterval", blockInterval)
>   .config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>   .config("spark.streaming.receiver.writeAheadLog.enable", "true")
>.enableHiveSupport
>   .getOrCreate()
> val sc = spark.sparkContext
> val ssc = new StreamingContext(sc, Seconds(batchInterval))
> ssc.checkpoint(checkpointDirectory)
>
> val converter: Message => Option[String] = {
>   case msg: TextMessage =>
> Some(msg.getText)
>   case _ =>
> None
> }
>
> val props = new Properties()
> props.setProperty(
>   Context.INITIAL_CONTEXT_FACTORY,
>   "com.solacesystems.jndi.SolJNDIInitialContextFactory"
> )
> props.setProperty(Context.PROVIDER_URL, brokerUrl)
> props.setProperty(Context.SECURITY_PRINCIPAL, username)
> props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
> props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)
>
> val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
>   ssc,
>
> JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue), 
> connectionFactoryName
> = connectionFactory,messageSelector =
> ""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )
>
> msgs.foreachRDD(rdd =>
>   if (rdd.take(1).length > 0) {
> val messages: RDD[String] = rdd.map { sr =>
>   if (sr == null) {
> println("NO records found")
> "NO records found"
>   } else {
> println("Input Records from Solace queue : " + sr.toString)
> sr.toString
>   }
> }
> Thread.sleep(12)
> try{
>   * val messagesJson = spark.read.json(messages) ===> getting NPE
> here after restarting using WAL*
>   messagesJson.write.mode("append").parquet(data)
> }
> catch {
>   case ex => ex.printStackTrace()
> }
>   })
> ssc
>   }
> Thanks & Regards,
> Nayan Sharma
>  *+91-8095382952*
>
> 
> 
>
>
> On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> It is challenging to make a recommendation without further details. I am
>> guessing you are trying to build a fault-tolerant spark application (spark
>> structured streaming) that consumes messages from Solace?
>> To address *NullPointerException* in the 

Re: Null pointer exception while replying WAL

2024-02-12 Thread nayan sharma
Please find below code

 def main(args: Array[String]): Unit = {
val config: Config = ConfigFactory.load()
val streamC = StreamingContext.getOrCreate(
  checkpointDirectory,
  () => functionToCreateContext(config, checkpointDirectory)
)

streamC.start()
streamC.awaitTermination()
  }

  def functionToCreateContext(config: Config, checkpointDirectory: String):
StreamingContext = {

val brokerUrl = config.getString("streaming.solace.brokerURL")
val username = config.getString("streaming.solace.userName")
val passwordSol = config.getString("streaming.solace.password")
val vpn = config.getString("streaming.solace.vpn")
val queue = config.getString("streaming.solace.queueName")
val connectionFactory =
config.getString("streaming.solace.connectionFactory")



val spark = SparkSession
  .builder()
  .appName("rem-Streaming-Consumer")
  .config("spark.streaming.receiver.writeAheadLog.enable", "true")
  .config("spark.streaming.blockInterval", blockInterval)
  .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
  .config("spark.streaming.receiver.writeAheadLog.enable", "true")
   .enableHiveSupport
  .getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchInterval))
ssc.checkpoint(checkpointDirectory)

val converter: Message => Option[String] = {
  case msg: TextMessage =>
Some(msg.getText)
  case _ =>
None
}

val props = new Properties()
props.setProperty(
  Context.INITIAL_CONTEXT_FACTORY,
  "com.solacesystems.jndi.SolJNDIInitialContextFactory"
)
props.setProperty(Context.PROVIDER_URL, brokerUrl)
props.setProperty(Context.SECURITY_PRINCIPAL, username)
props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)

val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
  ssc,

JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue),
connectionFactoryName
= connectionFactory,messageSelector =
""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )

msgs.foreachRDD(rdd =>
  if (rdd.take(1).length > 0) {
val messages: RDD[String] = rdd.map { sr =>
  if (sr == null) {
println("NO records found")
"NO records found"
  } else {
println("Input Records from Solace queue : " + sr.toString)
sr.toString
  }
}
Thread.sleep(12)
try{
  * val messagesJson = spark.read.json(messages) ===> getting NPE here
after restarting using WAL*
  messagesJson.write.mode("append").parquet(data)
}
catch {
  case ex => ex.printStackTrace()
}
  })
ssc
  }
Thanks & Regards,
Nayan Sharma
 *+91-8095382952*





On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh 
wrote:

> Hi,
>
> It is challenging to make a recommendation without further details. I am
> guessing you are trying to build a fault-tolerant spark application (spark
> structured streaming) that consumes messages from Solace?
> To address *NullPointerException* in the context of the provided
> information, you need to review the part of the code where the exception is
> thrown and identifying which object or method call is resulting in *null* can
> help the debugging process plus checking the logs.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Feb 2024 at 05:29, nayan sharma 
> wrote:
>
>> Hi Users,
>>
>> I am trying to build fault tolerant spark solace consumer.
>>
>> Issue :- we have to take restart of the job due to multiple issue load
>> average is one of them. At that time whatever spark is processing or
>> batches in the queue is lost. We can't replay it because we already had
>> send ack while calling store().
>>
>> Solution:- I have tried implementing WAL and checkpointing in the
>> solution. Job is able to identify the lost batches, records are not being
>> written in the log file but throwing NPE.
>>
>> We are creating sparkcontext using sc.getorcreate()
>>
>>
>> Thanks,
>> Nayan
>>
>


Re: Null pointer exception while replying WAL

2024-02-11 Thread Mich Talebzadeh
Hi,

It is challenging to make a recommendation without further details. I am
guessing you are trying to build a fault-tolerant spark application (spark
structured streaming) that consumes messages from Solace?
To address *NullPointerException* in the context of the provided
information, you need to review the part of the code where the exception is
thrown and identifying which object or method call is resulting in *null* can
help the debugging process plus checking the logs.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 10 Feb 2024 at 05:29, nayan sharma  wrote:

> Hi Users,
>
> I am trying to build fault tolerant spark solace consumer.
>
> Issue :- we have to take restart of the job due to multiple issue load
> average is one of them. At that time whatever spark is processing or
> batches in the queue is lost. We can't replay it because we already had
> send ack while calling store().
>
> Solution:- I have tried implementing WAL and checkpointing in the
> solution. Job is able to identify the lost batches, records are not being
> written in the log file but throwing NPE.
>
> We are creating sparkcontext using sc.getorcreate()
>
>
> Thanks,
> Nayan
>


Null pointer exception while replying WAL

2024-02-09 Thread nayan sharma
Hi Users,

I am trying to build fault tolerant spark solace consumer.

Issue :- we have to take restart of the job due to multiple issue load
average is one of them. At that time whatever spark is processing or
batches in the queue is lost. We can't replay it because we already had
send ack while calling store().

Solution:- I have tried implementing WAL and checkpointing in the solution.
Job is able to identify the lost batches, records are not being written in
the log file but throwing NPE.

We are creating sparkcontext using sc.getorcreate()


Thanks,
Nayan


Re: Building an Event-Driven Real-Time Data Processor with Spark Structured Streaming and API Integration

2024-02-09 Thread Mich Talebzadeh
The full code is available from the link below

https://github.com/michTalebzadeh/Event_Driven_Real_Time_data_processor_with_SSS_and_API_integration

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Feb 2024 at 16:16, Mich Talebzadeh 
wrote:

> Appreciate your thoughts on this, Personally I think Spark Structured
> Streaming can be used effectively in an Event Driven Architecture  as well
> as  continuous streaming)
>
> From the link here
> 
>
> HTH,
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Building an Event-Driven Real-Time Data Processor with Spark Structured Streaming and API Integration

2024-02-09 Thread Mich Talebzadeh
Appreciate your thoughts on this, Personally I think Spark Structured
Streaming can be used effectively in an Event Driven Architecture  as well
as  continuous streaming)

>From the link here


HTH,

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


performance of union vs insert into

2024-02-08 Thread Manish Mehra
Hello,
I have an observation wherein performance of 'union' is lower when compared to 
multiple 'insert into' statements. Is this in line with Spark best practice?

Regards
Manish Mehra






[ANNOUNCE] Apache Celeborn(incubating) 0.4.0 available

2024-02-06 Thread Fu Chen
Hi all,

Apache Celeborn(Incubating) community is glad to announce the
new release of Apache Celeborn(Incubating) 0.4.0.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

-
https://github.com/apache/incubator-celeborn/releases/tag/v0.4.0-incubating

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.0


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Thanks,
Fu Chen
On behalf of the Apache Celeborn(incubating) community


Community over Code EU 2024 Travel Assistance Applications now open!

2024-02-03 Thread Gavin McDonald
Hello to all users, contributors and Committers!

The Travel Assistance Committee (TAC) are pleased to announce that
travel assistance applications for Community over Code EU 2024 are now
open!

We will be supporting Community over Code EU, Bratislava, Slovakia,
June 3th - 5th, 2024.

TAC exists to help those that would like to attend Community over Code
events, but are unable to do so for financial reasons. For more info
on this years applications and qualifying criteria, please visit the
TAC website at < https://tac.apache.org/ >. Applications are already
open on https://tac-apply.apache.org/, so don't delay!

The Apache Travel Assistance Committee will only be accepting
applications from those people that are able to attend the full event.

Important: Applications close on Friday, March 1st, 2024.

Applicants have until the the closing date above to submit their
applications (which should contain as much supporting material as
required to efficiently and accurately process their request), this
will enable TAC to announce successful applications shortly
afterwards.

As usual, TAC expects to deal with a range of applications from a
diverse range of backgrounds; therefore, we encourage (as always)
anyone thinking about sending in an application to do so ASAP.

For those that will need a Visa to enter the Country - we advise you apply
now so that you have enough time in case of interview delays. So do not
wait until you know if you have been accepted or not.

We look forward to greeting many of you in Bratislava, Slovakia in June,
2024!

Kind Regards,

Gavin

(On behalf of the Travel Assistance Committee)


[no subject]

2024-02-03 Thread Gavin McDonald
Hello to all users, contributors and Committers!

The Travel Assistance Committee (TAC) are pleased to announce that
travel assistance applications for Community over Code EU 2024 are now
open!

We will be supporting Community over Code EU, Bratislava, Slovakia,
June 3th - 5th, 2024.

TAC exists to help those that would like to attend Community over Code
events, but are unable to do so for financial reasons. For more info
on this years applications and qualifying criteria, please visit the
TAC website at < https://tac.apache.org/ >. Applications are already
open on https://tac-apply.apache.org/, so don't delay!

The Apache Travel Assistance Committee will only be accepting
applications from those people that are able to attend the full event.

Important: Applications close on Friday, March 1st, 2024.

Applicants have until the the closing date above to submit their
applications (which should contain as much supporting material as
required to efficiently and accurately process their request), this
will enable TAC to announce successful applications shortly
afterwards.

As usual, TAC expects to deal with a range of applications from a
diverse range of backgrounds; therefore, we encourage (as always)
anyone thinking about sending in an application to do so ASAP.

For those that will need a Visa to enter the Country - we advise you apply
now so that you have enough time in case of interview delays. So do not
wait until you know if you have been accepted or not.

We look forward to greeting many of you in Bratislava, Slovakia in June,
2024!

Kind Regards,

Gavin

(On behalf of the Travel Assistance Committee)


Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Mich Talebzadeh
I agree with what is stated. This is the gist of my understanding having
tested it.
When working with Spark Structured Streaming, each streaming query runs in
its own separate Spark session to ensure isolation and avoid conflicts
between different queries.
So here I have:

def process_data(self, df: F.DataFrame, batchId: int) -> None:
  if(len(df.take(1))) > 0:
df.select(col("timestamp"), col("value"), col("rowkey"), col("ID"),
col("CLUSTERED"), col("op_time")).show(1, False)
df.createOrReplaceTempView("tmp_view")
try:
   rows = *df.sparkSession.sq*l("SELECT COUNT(1) FROM
tmp_view").collect()[0][0]
   print(f"Number of rows: {rows}")
except Exception as e:
   logging.error(f"Error counting rows: {e}")
  else:
logging.warning("DataFrame is empty")

Here, df.sparkSession accesses the rows associated with the streaming
DataFrame 'df'

+---++++-+---+
|timestamp  |value   |rowkey  |ID
|CLUSTERED|op_time|
+---++++-+---+
|2024-01-31
20:31:24.152|25754740|df4d864d-517d-4f59-8f9e-bd1e7cd9678f|25754740|2575473.9|2024-01-31
20:31:30|
+---++++-+---+
only showing top 1 row

rows is 50

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 31 Jan 2024 at 13:30, Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster level
> across all the notebook)?
>
> Thanks & Regards
>


Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Mich Talebzadeh
hm.

In your logic here

def process_micro_batch(micro_batch_df, batchId) :
micro_batch_df.createOrReplaceTempView("temp_view")
df = spark.sql(f"select * from temp_view")
return df

Is this function called and if so do you check if micro_batch_df contains
rows ->   if len(micro_batch_df.take(1)) > 0:

something like

# Modified process_data function to check for external trigger
def process_data(batch_df: F.DataFrame, batchId: int) -> None:
*if len(batch_df.take(1)) > 0:*
  # Check for external event trigger
  if listen_for_external_event():
# Assuming 'data' is a list of dictionaries obtained from the API
in each batch
api_data = get_api_data()
if api_data:
  dfAPI = spark_session.createDataFrame(api_data,
schema=data_schema)
  dfAPI = dfAPI \
 .withColumn("op_type", lit(udfs.op_type_api_udf())) \
 .withColumn("op_time", udfs.timestamp_udf(current_timestamp()))
  dfAPI.show(10, False)
else:
   logging.warning("Error getting API data.")
  else:
logging.info("No external trigger received.")
*else:*
*logging.warning("DataFrame is empty")*

# Streaming DataFrame Creation:
# construct a streaming dataframe that subscribes to topic rate for data
"""
This creates a streaming DataFrame by subscribing to a rate source.
It simulates a stream by generating data at a specified rate
(rowsPerSecond).
"""
streamingDataFrame = spark_session.readStream.format("rate") \
.option("rowsPerSecond", 100) \
.option("subscribe", "rate") \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load()

# Generate a unique query name by appending a timestamp
query_name = f"{appName}_{int(time.time())}"
logging.info(query_name)

# Main loop to continuously check for events
while True:
# Start the streaming query only if an external event is received
if listen_for_external_event():
query_name = f"{appName}_{int(time.time())}"
logging.info(query_name)
result_query = (
streamingDataFrame.writeStream
.outputMode('append')
.option("truncate", "false")
.foreachBatch(lambda df, batchId: process_data(df, batchId))
.trigger(processingTime=f'{processingTime} seconds')
.option('checkpointLocation', checkpoint_path)
.queryName(f"{query_name}")
.start()
)
break  # Exit the loop after starting the streaming query
else:
time.sleep(5)  # Sleep for a while before checking for the next
event

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 31 Jan 2024 at 13:30, Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster 

deploy spark as cluster

2024-01-31 Thread ali sharifi
 Hi everyone! I followed this guide
https://dev.to/mvillarrealb/creating-a-spark-standalone-cluster-with-docker-and-docker-compose-2021-update-6l4
to create a Spark cluster on an Ubuntu server with Docker. However, when I
try to submit my PySpark code to the master, the jobs are registered in the
Spark UI but I encounter an error when checking the worker:

24/01/31 09:04:35 ERROR Inbox: Ignoring error
java.io.EOFException
at java.base/java.io.DataInputStream.readFully(Unknown Source)
at java.base/java.io.DataInputStream.readUTF(Unknown Source)
at java.base/java.io.DataInputStream.readUTF(Unknown Source)
at 
org.apache.spark.scheduler.TaskDescription$.deserializeStringLongMap(TaskDescription.scala:138)
at 
org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:178)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:185)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)


Could you please help me?  What should I do?


Create Custom Logs

2024-01-31 Thread PRASHANT L
Hi
I justed wanted to check if there is a way to create custom log in Spark
I want to write selective/custom log messages to S3 , running spark submit
on EMR
I would  not want all the spark generated logs ... I would just need the
log messages that are logged as part of Spark Application


Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Jungtaek Lim
Hi,

Streaming query clones the spark session - when you create a temp view from
DataFrame, the temp view is created under the cloned session. You will need
to use micro_batch_df.sparkSession to access the cloned session.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, Jan 31, 2024 at 3:29 PM Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster level
> across all the notebook)?
>
> Thanks & Regards
>


randomsplit has issue?

2024-01-31 Thread second_co...@yahoo.com.INVALID
based on this blog post 
https://sergei-ivanov.medium.com/why-you-should-not-use-randomsplit-in-pyspark-to-split-data-into-train-and-test-58576d539a36
 , I noticed a recommendation against using randomSplit for data splitting due 
to data sorting. Is the information provided in the blog accurate? I understand 
that the reason for data sorting is to partition the data using Spark. Could 
anyone clarify whether we should continue using randomSplit to divide our data 
into training and test sets or use filter() instead?

Thank you




Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-30 Thread Karthick Nk
Hi Team,

I am using structered streaming in pyspark in azure Databricks, in that I
am creating temp_view from dataframe
(df.createOrReplaceTempView('temp_view')) for performing spark sql query
transformation.
In that I am facing the issue that temp_view not found, so that as a
workaround i have created global temp_view to use.
But same when i have tried to create without streaming, i am able to
perform the temp_view.


write_to_final_table =
 
(spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
"true").option("failOnDataLoss", "false").start()


def process_micro_batch(micro_batch_df, batchId) :
micro_batch_df.createOrReplaceTempView("temp_view")
df = spark.sql(f"select * from temp_view")
return df

Here, I am getting error, while reading data from temp_view that temp_view
not found error.


I need to perform or create temp_view (*Not global temp_view)based on the
dataframe, and need to perform the spark sql transformation in structered
streaming.

I have few question in my hand?
1. is strucutered streaming and spark.sql will have different spark.context
within same databricks notebook?
2. If i want to create temp_view based on the dataframe and need to perform
the spark sql operation, how can i create the tempview (Not global
tempview, Since global temp view will be available in the cluster level
across all the notebook)?

Thanks & Regards


[Spark SQL]: Crash when attempting to select PostgreSQL bpchar without length specifier in Spark 3.5.0

2024-01-29 Thread Lily Hahn
Hi,

I’m currently migrating an ETL project to Spark 3.5.0 from 3.2.1 and ran into 
an issue with some of our queries that read from PostgreSQL databases.

Any attempt to run a Spark SQL query that selects a bpchar without a length 
specifier from the source DB seems to crash:
py4j.protocol.Py4JJavaError: An error occurred while calling 
o1061.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 
0) (192.168.1.48 executor driver): java.lang.OutOfMemoryError: Requested array 
size exceeds VM limit
at org.apache.spark.unsafe.types.UTF8String.rpad(UTF8String.java:880)
at 
org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.readSidePadding(CharVarcharCodegenUtils.java:62)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$2882/0x00080124d840.apply(Unknown
 Source)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

This appears to be the plan step related to the traceback:
staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, 
StringType, readSidePadding, bpcharcolumn#7, 2147483647, true, false, true)

Reading from a subquery and casting the column to varchar, appears to work 
around the issue.

In PostgreSQL, the bpchar type acts as a variable unlimited length, 
blank-trimmed string if the length is omitted from the definition.

Is this an issue with Spark? I think the column is incorrectly getting 
interpreted as a char, which behaves the same way as a bpchar(n).


Re: startTimestamp doesn't work when using rate-micro-batch format

2024-01-29 Thread Mich Talebzadeh
As I stated earlier on,, there are alternatives that you might explore
socket sources for testing purposes.

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when
from pyspark.sql.types import StructType, StructField, LongType

spark = SparkSession.builder \
.master("local[*]") \
.appName("StreamingSparkPartitioned") \
.getOrCreate()

expression = when(expr("value % 3 = 1"), "stupid_event") \
.otherwise(when(expr("value % 3 = 2"), "smart_event") \
.otherwise("neutral_event"))

# Define the schema to match the socket source data
schema = StructType([StructField("value", LongType())])
checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"

# Start a socket source for testing
socket_streamingDF = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", ) \
.load() \
.withColumn("value", expr("CAST(value AS LONG)")) \
.withColumn("event_type", expression)

query = (
socket_streamingDF.writeStream
.outputMode("append")
.format("console")
.trigger(processingTime="1 second")
.option("checkpointLocation", checkpoint_path)
.start()
)

query.awaitTermination()

In this example, it listens to a socket on localhost: and expects a
single integer value per line. You can use tools like netcat to send data
to this socket for testing.

echo "1" | nc -lk 

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 29 Jan 2024 at 11:33, Perfect Stranger 
wrote:

> Yes, there's definitely an issue, can someone fix it? I'm not familiar
> with apache jira, do I need to make a  bug report or what?
>
> On Mon, Jan 29, 2024 at 2:57 AM Mich Talebzadeh 
> wrote:
>
>> OK
>>
>> This is the equivalent Python code
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import expr, when
>> from pyspark.sql.types import StructType, StructField, LongType
>> from datetime import datetime
>>
>> spark = SparkSession.builder \
>> .master("local[*]") \
>> .appName("StreamingSparkPartitioned") \
>> .getOrCreate()
>>
>> expression = when(expr("value % 3 = 1"), "stupid_event") \
>> .otherwise(when(expr("value % 3 = 2"),
>> "smart_event").otherwise("neutral_event"))
>>
>> # Define the schema to match the rate-micro-batch data source
>> schema = StructType([StructField("timestamp", LongType()),
>> StructField("value", LongType())])
>> checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"
>>
>> # Convert human-readable timestamp to Unix timestamp in milliseconds
>> start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)
>>
>> streamingDF = spark.readStream \
>> .format("rate-micro-batch") \
>> .option("rowsPerBatch", "100") \
>> .option("startTimestamp", start_timestamp) \
>> .option("numPartitions", 1) \
>> .load() \
>> .withColumn("event_type", expression)
>>
>> query = (
>> streamingDF.writeStream
>> .outputMode("append")
>> .format("console")
>> .trigger(processingTime="1 second")
>> .option("checkpointLocation", checkpoint_path)
>> .start()
>> )
>>
>> query.awaitTermination()
>>
>> This is the error I am getting
>>   File
>> "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line
>> 38, in 
>> query.awaitTermination()
>>   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line
>> 201, in awaitTermination
>>   File
>> "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line
>> 1322, in __call__
>>   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
>> line 175, in deco
>> pyspark.errors.exceptions.captured.StreamingQueryException:
>> [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
>> f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
>> value for offset
>> Did not find value which can be converted into long
>>
>> Seems like there might be an issue with the *rate-micro-batch* source
>> when using the *startTimestamp* option.
>>
>> You can try using socket source for testing purposes
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data 

Re: startTimestamp doesn't work when using rate-micro-batch format

2024-01-29 Thread Perfect Stranger
Yes, there's definitely an issue, can someone fix it? I'm not familiar with
apache jira, do I need to make a  bug report or what?

On Mon, Jan 29, 2024 at 2:57 AM Mich Talebzadeh 
wrote:

> OK
>
> This is the equivalent Python code
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import expr, when
> from pyspark.sql.types import StructType, StructField, LongType
> from datetime import datetime
>
> spark = SparkSession.builder \
> .master("local[*]") \
> .appName("StreamingSparkPartitioned") \
> .getOrCreate()
>
> expression = when(expr("value % 3 = 1"), "stupid_event") \
> .otherwise(when(expr("value % 3 = 2"),
> "smart_event").otherwise("neutral_event"))
>
> # Define the schema to match the rate-micro-batch data source
> schema = StructType([StructField("timestamp", LongType()),
> StructField("value", LongType())])
> checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"
>
> # Convert human-readable timestamp to Unix timestamp in milliseconds
> start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)
>
> streamingDF = spark.readStream \
> .format("rate-micro-batch") \
> .option("rowsPerBatch", "100") \
> .option("startTimestamp", start_timestamp) \
> .option("numPartitions", 1) \
> .load() \
> .withColumn("event_type", expression)
>
> query = (
> streamingDF.writeStream
> .outputMode("append")
> .format("console")
> .trigger(processingTime="1 second")
> .option("checkpointLocation", checkpoint_path)
> .start()
> )
>
> query.awaitTermination()
>
> This is the error I am getting
>   File
> "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line
> 38, in 
> query.awaitTermination()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py",
> line 201, in awaitTermination
>   File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
> line 175, in deco
> pyspark.errors.exceptions.captured.StreamingQueryException:
> [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
> f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
> value for offset
> Did not find value which can be converted into long
>
> Seems like there might be an issue with the *rate-micro-batch* source
> when using the *startTimestamp* option.
>
> You can try using socket source for testing purposes
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 28 Jan 2024 at 22:00, Perfect Stranger 
> wrote:
>
>> I described the issue here:
>>
>>
>> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>>
>> Could someone please respond?
>>
>> The rate-micro-batch format doesn't seem to respect the startTimestamp
>> option.
>>
>> Thanks.
>>
>


Re: startTimestamp doesn't work when using rate-micro-batch format

2024-01-28 Thread Mich Talebzadeh
OK

This is the equivalent Python code

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when
from pyspark.sql.types import StructType, StructField, LongType
from datetime import datetime

spark = SparkSession.builder \
.master("local[*]") \
.appName("StreamingSparkPartitioned") \
.getOrCreate()

expression = when(expr("value % 3 = 1"), "stupid_event") \
.otherwise(when(expr("value % 3 = 2"),
"smart_event").otherwise("neutral_event"))

# Define the schema to match the rate-micro-batch data source
schema = StructType([StructField("timestamp", LongType()),
StructField("value", LongType())])
checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"

# Convert human-readable timestamp to Unix timestamp in milliseconds
start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)

streamingDF = spark.readStream \
.format("rate-micro-batch") \
.option("rowsPerBatch", "100") \
.option("startTimestamp", start_timestamp) \
.option("numPartitions", 1) \
.load() \
.withColumn("event_type", expression)

query = (
streamingDF.writeStream
.outputMode("append")
.format("console")
.trigger(processingTime="1 second")
.option("checkpointLocation", checkpoint_path)
.start()
)

query.awaitTermination()

This is the error I am getting
  File "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py",
line 38, in 
query.awaitTermination()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py",
line 201, in awaitTermination
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
  File
"/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
line 175, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED]
Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
value for offset
Did not find value which can be converted into long

Seems like there might be an issue with the *rate-micro-batch* source when
using the *startTimestamp* option.

You can try using socket source for testing purposes

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 28 Jan 2024 at 22:00, Perfect Stranger 
wrote:

> I described the issue here:
>
>
> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>
> Could someone please respond?
>
> The rate-micro-batch format doesn't seem to respect the startTimestamp
> option.
>
> Thanks.
>


startTimestamp doesn't work when using rate-micro-batch format

2024-01-28 Thread Perfect Stranger
I described the issue here:

https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format

Could someone please respond?

The rate-micro-batch format doesn't seem to respect the startTimestamp
option.

Thanks.


subscribe

2024-01-26 Thread Sahib Aulakh
subscribe


subscribe

2024-01-26 Thread Sahib Aulakh



Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-24 Thread Andrzej Zera
Hi,

I'm sorry but I got confused about the inner workings of late events
watermark. You're completely right. Thanks for clarifying.

Regards,
Andrzej

czw., 11 sty 2024 o 13:02 Jungtaek Lim 
napisał(a):

> Hi,
>
> The time window is closed and evicted as long as "eviction watermark"
> passes the end of the window. Late events watermark only deals with
> discarding late events from "inputs". We did not introduce additional delay
> on the work of multiple stateful operators. We just allowed more late
> events to be accepted.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera 
> wrote:
>
>> I'm struggling with the following issue in Spark >=3.4, related to
>> multiple stateful operations.
>>
>> When spark.sql.streaming.statefulOperator.allowMultiple is enabled,
>> Spark keeps track of two types of watermarks:
>> eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents.
>> Introducing them allowed chaining multiple stateful operations but also
>> introduced an additional delay for getting the output out of the streaming
>> query.
>>
>> I'll show this on the example. Assume we have a stream of click events
>> and we aggregate it first by 1-min window and then by 5-min window. If we
>> have a trigger interval of 30s, then in most cases we'll get output 30s
>> later compared to single stateful operations queries. To find out how,
>> let's look at the following examples:
>>
>> Example 1. Single stateful operation (aggregation by 5-min window, assume
>> watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp
>> at the time of getting data from Kafka
>> Global watermark Output
>> 14:10:00 14:09:56 0 -
>> 14:10:30 14:10:26 14:09:56 -
>> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>>
>> Example 2. Mutliple stateful operations (aggregation by 1-min window
>> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp at the time of
>> getting data from Kafka Late events watermark Eviction watermark Output
>> 14:10:00 14:09:56 0 0 -
>> 14:10:30 14:10:26 0 14:09:56 -
>> 14:11:00 14:10:56 14:09:56 14:10:26 -
>> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>>
>> In Example 2, we need to wait until both watermarks cross the end of the
>> window to get the output for that window, which happens one iteration later
>> compared to Example 1.
>>
>> Now, in use cases that require near-real-time processing, this one
>> iteration delay can be quite a significant difference.
>>
>> Do we have any option to make streaming queries with multiple stateful
>> operations output data without waiting this extra iteration? One of my
>> ideas was to force an empty microbatch to run and propagate late events
>> watermark without any new data. While this conceptually works, I didn't
>> find a way to trigger an empty microbatch while being connected to Kafka
>> that constantly receives new data and while having a constant 30s trigger
>> interval.
>>
>> Thanks,
>> Andrzej
>>
>


Some optimization questions about our beloved engine Spark

2024-01-23 Thread Aissam Chia
Hi,

I hope this email finds you well.

Currently, I'm working on spark SQL and I have two main questions that I've
been struggling with for 2 weeks now. I'm running spark on AWS EMR :

   1. I'm running 30 spark applications in the same cluster. My
   applications are basically some SQL transformations that are computed on
   data stored on S3. I monitored the cluster using Ganglia and I noticed that
   most of the time I'm using roughly 2% on the total CPU. How can I
   optimize/maximize resource usage ?
   2. I have a spark application that is computing approximately 2Tb of
   data. I'm using 50 rg6.2xlarge ec2 instances and I'm using 90% of the total
   CPU capacity. My SQL job consists of joining a timeseries table with
   another table that I broadcast on the different nodes. Is it possible to
   control which data to load to each node ? For example, I want each node to
   group the rows with the same joining key.

Thank you for your time.

Kind regards,
Aissam CHIA


Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-01-17 Thread Abhishek Singla
Hi Team,

Version: 3.2.2
Java Version: 1.8.0_211
Scala Version: 2.12.15
Cluster: Standalone

I am using Spark Streaming to read from Kafka and write to S3. The job
fails with below error if there are no records published to Kafka for a few
days and then there are some records published. Could someone help me in
identifying the root cause of this job failure.

24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id =
72ee1070-7e05-4999-8b55-2a99e216ec51, runId =
0919e548-9706-4757-be94-359848100070] terminated with error
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find
any valid local directory for s3ablock-0001-
at 
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
at 
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
at 
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
at 
org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
at 
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
at 
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
at 
org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
at 
org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at 

Re: unsubscribe

2024-01-17 Thread Крюков Виталий Семенович
unsubscribe


От: Leandro Martelli 
Отправлено: 17 января 2024 г. 1:53:04
Кому: user@spark.apache.org
Тема: unsubscribe

unsubscribe


unsubscribe

2024-01-16 Thread Leandro Martelli
unsubscribe


Unsubscribe

2024-01-13 Thread Andrew Redd
Unsubscribe


<    1   2   3   4   5   6   7   8   9   10   >