Spark-SQL : Getting current user name in UDF

2022-02-21 Thread Lavelle, Shawn
Hello Spark Users,

I have a UDF I wrote for use with Spark-SQL that performs a look up.  In 
that look up, I need to get the current sql user so I can validate their 
permissions.  I was using org.apach.spark.sql.util.Utils.getCurrentUserName() 
to retrieve the current active user from within the UDF but today I discovered 
that that call returns a different user based on the context:

select myUDF();
returns the SQL user

select myUDF() from myTable ;
returns the operating system (application?) user.

I can provide a code example if needed, but it's just calling 
Utils.getCurrentUserName() from within the UDF code.

Does this sound like expected behavior or a defect?  Is there another way I can 
get the active SQL user inside a UDF?

Thanks in advance,

~ Shawn

PS I can't add username as a parameter to the UDF because I can't rely the user 
to not submit someone else's username.




[OSI Logo]
Shawn Lavelle

Software Development

OSI Digital Grid Solutions
4101 Arrowhead Drive
Medina, Minnesota 55340-9457
Phone: 763 551 0559
Email: shawn.lave...@osii.com
Website: www.osii.com
[Emerson Logo]
We are proud to
now be a part of
Emerson.


Re: StructuredStreaming - foreach/foreachBatch

2022-02-21 Thread karan alang
Thanks, Gourav - will check out the book.

regds,
Karan Alang

On Thu, Feb 17, 2022 at 9:05 AM Gourav Sengupta 
wrote:

> Hi,
>
> The following excellent documentation may help as well:
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
> fundamental thinking behind these concepts.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Wed, Feb 9, 2022 at 8:51 PM karan alang  wrote:
>
>> Thanks, Mich .. will check it out
>>
>> regds,
>> Karan Alang
>>
>> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
>> wrote:
>>
>>> BTW you can check this Linkedin article of mine on Processing Change
>>> Data Capture with Spark Structured Streaming
>>> 
>>>
>>>
>>> It covers the concept of triggers including trigger(once = True) or
>>> one-time batch in Spark Structured Streaming
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>>>
 Thanks, Mich .. that worked fine!


 On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> read below
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> *foreachBatch(SendToBigQuery) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch --> batchId*
>Using foreachBatch, we write each micro batch to
> storage defined in our custom logic. In this case, we store the output of
> our streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>,
> col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  *foreachBatch(SendToBigQuery)*. \
>  trigger(processingTime='2 seconds'). \
>  start()
>
> now you define your function *SendToBigQuery() *
>
>
> *def SendToBigQuery(df, batchId):*
>
> if(len(df.take(1))) > 0:
>
> df.printSchema()
>
> print(f"""batchId is {batchId}""")
>
> rows = df.count()
>
> print(f""" Total records processed in this run = {rows}""")
>
> ..
>
> else:
>
> print("DataFrame is empty")
>
> *HTH*
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 21:06, karan alang 
> wrote:
>
>> Hello All,
>>
>> I'm using StructuredStreaming to read data from Kafka, and need to do
>> transformation on each individual row.
>>
>> I'm trying to use 'foreach' (or foreachBatch), and running into
>> issues.
>> Basic question - how is the row passed to the function when foreach
>> is used ?
>>
>> Also, when I use foreachBatch, seems the BatchId is available in the
>> function called ? How do I access individual rows ?
>>
>> Details are in stackoverflow :
>>
>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>
>> What is the best approach for this use-case ?
>>
>> tia!
>>
>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-21 Thread Danilo Sousa
Hello Gourav,

I`’ll read this Document.


Thanks.

> On 17 Feb 2022, at 14:05, Gourav Sengupta  wrote:
> 
> Hi,
> 
> The following excellent documentation may help as well: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>  
> 
>  
> 
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the 
> fundamental thinking behind these concepts.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:51 PM karan alang  > wrote:
> Thanks, Mich .. will check it out
> 
> regds,
> Karan Alang
> 
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh  > wrote:
> BTW you can check this Linkedin article of mine on Processing Change Data 
> Capture with Spark Structured Streaming 
> 
> 
> It covers the concept of triggers including trigger(once = True) or one-time 
> batch in Spark Structured Streaming
> 
> HTH
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 23:06, karan alang  > wrote:
> Thanks, Mich .. that worked fine!
> 
> 
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh  > wrote:
> read below
> 
> """
>"foreach" performs custom write logic on each row and 
> "foreachBatch" performs custom write logic on each micro-batch through 
> SendToBigQuery function
> foreachBatch(SendToBigQuery) expects 2 parameters, first: 
> micro-batch as DataFrame or Dataset and second: unique id for each batch --> 
> batchId
>Using foreachBatch, we write each micro batch to storage 
> defined in our custom logic. In this case, we store the output of our 
> streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is defined 
> as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(SendToBigQuery). \
>  trigger(processingTime='2 seconds'). \
>  start()
> 
> now you define your function SendToBigQuery() 
> 
> def SendToBigQuery(df, batchId):
> if(len(df.take(1))) > 0:
> df.printSchema()
> print(f"""batchId is {batchId}""")
> rows = df.count()
> print(f""" Total records processed in this run = {rows}""")
> ..
> else:
> print("DataFrame is empty")
> 
> HTH
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 21:06, karan alang  > wrote:
> Hello All,
> 
> I'm using StructuredStreaming to read data from Kafka, and need to do 
> transformation on each individual row.
> 
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is used ?
> 
> Also, when I use foreachBatch, seems the BatchId is available in the function 
> called ? How do I access individual rows ?
> 
> Details are in stackoverflow :   
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>  
> 
> 
> What is the best approach for this use-case ?
> 
> tia!



Re: Help With unstructured text file with spark scala

2022-02-21 Thread Danilo Sousa
Yes, this a only single file.

Thanks Rafael Mendes.

> On 13 Feb 2022, at 07:13, Rafael Mendes  wrote:
> 
> Hi, Danilo.
> Do you have a single large file, only?
> If so, I guess you can use tools like sed/awk to split it into more files 
> based on layout, so you can read these files into Spark.
> 
> 
> Em qua, 9 de fev de 2022 09:30, Bitfox  escreveu:
> Hi
> 
> I am not sure about the total situation.
> But if you want a scala integration I think it could use regex to match and 
> capture the keywords.
> Here I wrote one you can modify by your end.
> 
> import scala.io.Source
> import scala.collection.mutable.ArrayBuffer
> 
> val list1 = ArrayBuffer[(String,String,String)]()
> val list2 = ArrayBuffer[(String,String)]()
> 
> 
> val patt1 = """^(.*)#(.*)#([^#]*)$""".r
> val patt2 = """^(.*)#([^#]*)$""".r
> 
> val file = "1.txt"
> val lines = Source.fromFile(file).getLines()
> 
> for ( x <- lines ) {
>   x match {
> case patt1(k,v,z) => list1 += ((k,v,z))
> case patt2(k,v) => list2 += ((k,v))
> case _ => println("no match")
>   }
> }
> 
> 
> Now the list1 and list2 have the elements you wanted, you can convert them to 
> a dataframe easily.
> 
> Thanks.
> 
> On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa  > wrote:
> Hello
> 
> 
> Yes, for this block I can open as csv with # delimiter, but have the block 
> that is no csv format. 
> 
> This is the likely key value. 
> 
> We have two different layouts in the same file. This is the “problem”.
> 
> Thanks for your time.
> 
> 
> 
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>> 
>> Contrato#123456 - Test
>> Empresa#Test
> 
>> On 9 Feb 2022, at 00:58, Bitfox > > wrote:
>> 
>> Hello
>> 
>> You can treat it as a csf file and load it from spark:
>> 
>> >>> df = spark.read.format("csv").option("inferSchema", 
>> >>> "true").option("header", "true").option("sep","#").load(csv_file)
>> >>> df.show()
>> ++---+-+
>> |   Plano|Código Beneficiário|Nome Beneficiário|
>> ++---+-+
>> |58693 - NACIONAL ...|   65751353|   Jose Silva|
>> |58693 - NACIONAL ...|   65751388|  Joana Silva|
>> |58693 - NACIONAL ...|   65751353| Felipe Silva|
>> |58693 - NACIONAL ...|   65751388|  Julia Silva|
>> ++---+-+
>> 
>> 
>> cat csv_file:
>> 
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> 
>> 
>> Regards
>> 
>> 
>> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa > > wrote:
>> Hi
>> I have to transform unstructured text to dataframe.
>> Could anyone please help with Scala code ?
>> 
>> Dataframe need as:
>> 
>> operadora filial unidade contrato empresa plano codigo_beneficiario 
>> nome_beneficiario
>> 
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>> 
>> Contrato#123456 - Test
>> Empresa#Test
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>> 
>> Contrato#898011000 - FUNDACAO GERDAU
>> Empresa#FUNDACAO GERDAU
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
> 



Re: Question about spark.sql min_by

2022-02-21 Thread Mich Talebzadeh
I gave a similar answer to windowing functions in this thread add an
auto_increment column dated 7th February

https://lists.apache.org/list.html?user@spark.apache.org

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 21 Feb 2022 at 15:41, David Diebold  wrote:

> Thank you for your answers.
> Indeed windowing should help there.
> Also, I just realized maybe I can try to create a struct column with both
> price and sellerId and apply min() on it, ordering would consider price
> first for the ordering (https://stackoverflow.com/a/52669177/2015762)
>
> Cheers!
>
> Le lun. 21 févr. 2022 à 16:12, ayan guha  a écrit :
>
>> Why this can not be done by window function? Or is min by is just a short
>> hand?
>>
>> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:
>>
>>> From the source code, looks like this function was added to pyspark in
>>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>>> SQL with `spark.sql(...)` in Python though, not hard.
>>>
>>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
>>> wrote:
>>>
 Hello all,

 I'm trying to use the spark.sql min_by aggregation function with
 pyspark.
 I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2

 I have a dataframe made of these columns:
 - productId : int
 - sellerId : int
 - price : double

 For each product, I want to get the seller who sells the product for
 the cheapest price.

 Naive approach would be to do this, but I would expect two shuffles:

 import spark.sql.functions as F
 cheapest_prices_df  =
 df.groupby('productId').agg(F.min('price').alias('price'))
 cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
 'price'])

 I would had loved to do this instead :

 import spark.sql.functions as F
 cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
 F.min_by('sellerId', 'price'))

 Unfortunately min_by does not seem available in pyspark sql functions,
 whereas I can see it in the doc :
 https://spark.apache.org/docs/latest/api/sql/index.html

 I have managed to use min_by with this approach but it looks slow
 (maybe because of temp table creation ?):

 df.createOrReplaceTempView("table")
 cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
 sellerId, min(price) from table group by productId")

 Is there a way I can rely on min_by directly in groupby ?
 Is there some code missing in pyspark wrapper to make min_by visible
 somehow ?

 Thank you in advance for your help.

 Cheers
 David

>>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Logging to determine why driver fails

2022-02-21 Thread Artemis User
Another unknown issue I'd like to mention was, as we had in the past, 
that Spark 3.2.1 was bundled with log4j version 1.2.7.  That jar file is 
missing some APIs (e.g. RollingFileAppender), and you may encounter some 
ClassNotFound exceptions.  To resolve that issue, please make sure you 
download and place the apache-log4j-extras-1.2.17.jar file (should be 
downloadable from apache's web site) to the Spark's jars directory.


Not sure why Spark didn't bundle the extra jar file to the package. Hope 
this little bug will be resolved in the next release...


On 2/21/22 9:37 AM, Michael Williams (SSI) wrote:


Thank you.

*From:* Artemis User [mailto:arte...@dtechspace.com]
*Sent:* Monday, February 21, 2022 8:23 AM
*To:* Michael Williams (SSI) 
*Subject:* Re: Logging to determine why driver fails

Spark uses Log4j for logging.  There is a log4j properties template 
file located in the conf directory.  You can use that as a reference 
to turn on logging.  More info on log4j can be found on the 
logging.apache.org web site.


On 2/21/22 9:15 AM, Michael Williams (SSI) wrote:

Hello,

We have a POC using Spark 3.2.1 and none of us have any prior
Spark experience.  Our setup uses the native Spark REST api
(http://localhost:6066/v1/submissions/create

)
on the master node (not Livy, not Spark Job server).  While we
have been successful at submitting python jobs via this endpoint,
when we implemented .NET for Spark and have been attempting to
trigger those jobs using the api, the driver (on the worker)
simply reports failed, but there aren’t any log files created
because it is failing before the application starts.

Is there a logging configuration that can be made that might
increase the logging detail on the worker for internal Spark
processes and possibly tell us specifically the error occurring?

Thank you,

Mike

This electronic message may contain information that is
Proprietary, Confidential, or legally privileged or protected. It
is intended only for the use of the individual(s) and entity named
in the message. If you are not an intended recipient of this
message, please notify the sender immediately and delete the
material from your computer. Do not deliver, distribute or copy
this message and do not disclose its contents or take any action
in reliance on the information it contains. Thank You.

This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only 
for the use of the individual(s) and entity named in the message. If 
you are not an intended recipient of this message, please notify the 
sender immediately and delete the material from your computer. Do not 
deliver, distribute or copy this message and do not disclose its 
contents or take any action in reliance on the information it 
contains. Thank You. 


Re: Question about spark.sql min_by

2022-02-21 Thread David Diebold
Thank you for your answers.
Indeed windowing should help there.
Also, I just realized maybe I can try to create a struct column with both
price and sellerId and apply min() on it, ordering would consider price
first for the ordering (https://stackoverflow.com/a/52669177/2015762)

Cheers!

Le lun. 21 févr. 2022 à 16:12, ayan guha  a écrit :

> Why this can not be done by window function? Or is min by is just a short
> hand?
>
> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:
>
>> From the source code, looks like this function was added to pyspark in
>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>> SQL with `spark.sql(...)` in Python though, not hard.
>>
>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>>
>>> I have a dataframe made of these columns:
>>> - productId : int
>>> - sellerId : int
>>> - price : double
>>>
>>> For each product, I want to get the seller who sells the product for the
>>> cheapest price.
>>>
>>> Naive approach would be to do this, but I would expect two shuffles:
>>>
>>> import spark.sql.functions as F
>>> cheapest_prices_df  =
>>> df.groupby('productId').agg(F.min('price').alias('price'))
>>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>>> 'price'])
>>>
>>> I would had loved to do this instead :
>>>
>>> import spark.sql.functions as F
>>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>>> F.min_by('sellerId', 'price'))
>>>
>>> Unfortunately min_by does not seem available in pyspark sql functions,
>>> whereas I can see it in the doc :
>>> https://spark.apache.org/docs/latest/api/sql/index.html
>>>
>>> I have managed to use min_by with this approach but it looks slow (maybe
>>> because of temp table creation ?):
>>>
>>> df.createOrReplaceTempView("table")
>>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
>>> sellerId, min(price) from table group by productId")
>>>
>>> Is there a way I can rely on min_by directly in groupby ?
>>> Is there some code missing in pyspark wrapper to make min_by visible
>>> somehow ?
>>>
>>> Thank you in advance for your help.
>>>
>>> Cheers
>>> David
>>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Question about spark.sql min_by

2022-02-21 Thread ayan guha
Why this can not be done by window function? Or is min by is just a short
hand?

On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:

> From the source code, looks like this function was added to pyspark in
> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
> SQL with `spark.sql(...)` in Python though, not hard.
>
> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
> wrote:
>
>> Hello all,
>>
>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>
>> I have a dataframe made of these columns:
>> - productId : int
>> - sellerId : int
>> - price : double
>>
>> For each product, I want to get the seller who sells the product for the
>> cheapest price.
>>
>> Naive approach would be to do this, but I would expect two shuffles:
>>
>> import spark.sql.functions as F
>> cheapest_prices_df  =
>> df.groupby('productId').agg(F.min('price').alias('price'))
>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>> 'price'])
>>
>> I would had loved to do this instead :
>>
>> import spark.sql.functions as F
>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>> F.min_by('sellerId', 'price'))
>>
>> Unfortunately min_by does not seem available in pyspark sql functions,
>> whereas I can see it in the doc :
>> https://spark.apache.org/docs/latest/api/sql/index.html
>>
>> I have managed to use min_by with this approach but it looks slow (maybe
>> because of temp table creation ?):
>>
>> df.createOrReplaceTempView("table")
>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
>> min(price) from table group by productId")
>>
>> Is there a way I can rely on min_by directly in groupby ?
>> Is there some code missing in pyspark wrapper to make min_by visible
>> somehow ?
>>
>> Thank you in advance for your help.
>>
>> Cheers
>> David
>>
> --
Best Regards,
Ayan Guha


RE: Logging to determine why driver fails

2022-02-21 Thread Michael Williams (SSI)
Thank you.

From: Artemis User [mailto:arte...@dtechspace.com]
Sent: Monday, February 21, 2022 8:23 AM
To: Michael Williams (SSI) 
Subject: Re: Logging to determine why driver fails

Spark uses Log4j for logging.  There is a log4j properties template file 
located in the conf directory.  You can use that as a reference to turn on 
logging.  More info on log4j can be found on the logging.apache.org web site.
On 2/21/22 9:15 AM, Michael Williams (SSI) wrote:
Hello,

We have a POC using Spark 3.2.1 and none of us have any prior Spark experience. 
 Our setup uses the native Spark REST api 
(http://localhost:6066/v1/submissions/create)
 on the master node (not Livy, not Spark Job server).  While we have been 
successful at submitting python jobs via this endpoint, when we implemented 
.NET for Spark and have been attempting to trigger those jobs using the api, 
the driver (on the worker) simply reports failed, but there aren’t any log 
files created because it is failing before the application starts.

Is there a logging configuration that can be made that might increase the 
logging detail on the worker for internal Spark processes and possibly tell us 
specifically the error occurring?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.




This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Logging to determine why driver fails

2022-02-21 Thread Artemis User
Spark uses log4j for logging.  There is a log4j properties template file 
in the conf directory.  Just remove the "template" extension and change 
the content of log4j.properties to meet your need.  More info on log4j 
can be found at logging.apache.org...


On 2/21/22 9:15 AM, Michael Williams (SSI) wrote:


Hello,

We have a POC using Spark 3.2.1 and none of us have any prior Spark 
experience.  Our setup uses the native Spark REST api 
(http://localhost:6066/v1/submissions/create) on the master node (not 
Livy, not Spark Job server).  While we have been successful at 
submitting python jobs via this endpoint, when we implemented .NET for 
Spark and have been attempting to trigger those jobs using the api, 
the driver (on the worker) simply reports failed, but there aren’t any 
log files created because it is failing before the application starts.


Is there a logging configuration that can be made that might increase 
the logging detail on the worker for internal Spark processes and 
possibly tell us specifically the error occurring?


Thank you,

Mike

This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only 
for the use of the individual(s) and entity named in the message. If 
you are not an intended recipient of this message, please notify the 
sender immediately and delete the material from your computer. Do not 
deliver, distribute or copy this message and do not disclose its 
contents or take any action in reliance on the information it 
contains. Thank You. 


Logging to determine why driver fails

2022-02-21 Thread Michael Williams (SSI)
Hello,

We have a POC using Spark 3.2.1 and none of us have any prior Spark experience. 
 Our setup uses the native Spark REST api 
(http://localhost:6066/v1/submissions/create) on the master node (not Livy, not 
Spark Job server).  While we have been successful at submitting python jobs via 
this endpoint, when we implemented .NET for Spark and have been attempting to 
trigger those jobs using the api, the driver (on the worker) simply reports 
failed, but there aren't any log files created because it is failing before the 
application starts.

Is there a logging configuration that can be made that might increase the 
logging detail on the worker for internal Spark processes and possibly tell us 
specifically the error occurring?

Thank you,
Mike



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Question about spark.sql min_by

2022-02-21 Thread Sean Owen
>From the source code, looks like this function was added to pyspark in
Spark 3.3, up for release soon. It exists in SQL. You can still use it in
SQL with `spark.sql(...)` in Python though, not hard.

On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
wrote:

> Hello all,
>
> I'm trying to use the spark.sql min_by aggregation function with pyspark.
> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>
> I have a dataframe made of these columns:
> - productId : int
> - sellerId : int
> - price : double
>
> For each product, I want to get the seller who sells the product for the
> cheapest price.
>
> Naive approach would be to do this, but I would expect two shuffles:
>
> import spark.sql.functions as F
> cheapest_prices_df  =
> df.groupby('productId').agg(F.min('price').alias('price'))
> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
> 'price'])
>
> I would had loved to do this instead :
>
> import spark.sql.functions as F
> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
> F.min_by('sellerId', 'price'))
>
> Unfortunately min_by does not seem available in pyspark sql functions,
> whereas I can see it in the doc :
> https://spark.apache.org/docs/latest/api/sql/index.html
>
> I have managed to use min_by with this approach but it looks slow (maybe
> because of temp table creation ?):
>
> df.createOrReplaceTempView("table")
> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
> min(price) from table group by productId")
>
> Is there a way I can rely on min_by directly in groupby ?
> Is there some code missing in pyspark wrapper to make min_by visible
> somehow ?
>
> Thank you in advance for your help.
>
> Cheers
> David
>


Re: Encoders.STRING() causing performance problems in Java application

2022-02-21 Thread Sean Owen
Oh, yes of course. If you run an entire distributed Spark job for one row,
over and over, that's much slower. It would make much more sense to run the
whole data set at once - the point is parallelism here.

On Mon, Feb 21, 2022 at 2:36 AM  wrote:

> Thanks a lot, Sean, for the comments. I realize I didn't provide enough
> background information to properly diagnose this issue.
>
> In the meantime, I have created some test cases for isolating the problem
> and running some specific performance tests. The numbers are quite
> revealing: Running our Spark model individually on Strings takes about 8
> Sec for the test data, whereas is take 88 ms when run on the entire data in
> a single Dataset. This is a factor of 100x. This gets even worse for larger
> datasets.
>
> So, the root cause here is the way the Spark model is being called for one
> string at a time by the self-built prediction pipeline (which is also using
> other ML techniques apart from Spark). Needs some re-factoring...
>
> Thanks again for the help.
>
> Cheers,
>
> Martin
>
>
> Am 2022-02-18 13:41, schrieb Sean Owen:
>
> That doesn't make a lot of sense. Are you profiling the driver, rather
> than executors where the work occurs?
> Is your data set quite small such that small overheads look big?
> Do you even need Spark if your data is not distributed - coming from the
> driver anyway?
>
> The fact that a static final field did anything suggests something is
> amiss with your driver program. Are you perhaps inadvertently serializing
> your containing class with a bunch of other data by using its methods in a
> closure?
> If your data is small it's not surprising that the overhead could be in
> just copying the data around, the two methods you cite, rather than the
> compute.
> Too many things here to really say what's going on.
>
>
> On Fri, Feb 18, 2022 at 12:42 AM  wrote:
>
> Hello,
>
> I am working on optimising the performance of a Java ML/NLP application
> based on Spark / SparkNLP. For prediction, I am applying a trained model on
> a Spark dataset which consists of one column with only one row. The dataset
> is created like this:
>
> List textList = Collections.singletonList(text);
> Dataset data = sparkSession
> .createDataset(textList, Encoders.STRING())
> .withColumnRenamed(COL_VALUE, COL_TEXT);
>
>
> The predictions are created like this:
>
> PipelineModel fittedPipeline = pipeline.fit(dataset);
>
> Dataset prediction = fittedPipeline.transform(dataset);
>
>
> We noticed that the performance isn't quite as good as expected. After
> profiling the application with VisualVM, I noticed that the problem is with
> org.apache.spark.sql.Encoders.STRING() in the creation of the dataset,
> which by itself takes up about 75% of the time for the whole prediction
> method call.
>
> So, is there a simpler and more efficient way of creating the required
> dataset, consisting of one column and one String row?
>
> Thanks a lot.
>
> Cheers,
>
> Martin
>
>


Question about spark.sql min_by

2022-02-21 Thread David Diebold
Hello all,

I'm trying to use the spark.sql min_by aggregation function with pyspark.
I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2

I have a dataframe made of these columns:
- productId : int
- sellerId : int
- price : double

For each product, I want to get the seller who sells the product for the
cheapest price.

Naive approach would be to do this, but I would expect two shuffles:

import spark.sql.functions as F
cheapest_prices_df  =
df.groupby('productId').agg(F.min('price').alias('price'))
cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price'])

I would had loved to do this instead :

import spark.sql.functions as F
cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
F.min_by('sellerId', 'price'))

Unfortunately min_by does not seem available in pyspark sql functions,
whereas I can see it in the doc :
https://spark.apache.org/docs/latest/api/sql/index.html

I have managed to use min_by with this approach but it looks slow (maybe
because of temp table creation ?):

df.createOrReplaceTempView("table")
cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
min(price) from table group by productId")

Is there a way I can rely on min_by directly in groupby ?
Is there some code missing in pyspark wrapper to make min_by visible
somehow ?

Thank you in advance for your help.

Cheers
David


Re: Encoders.STRING() causing performance problems in Java application

2022-02-21 Thread martin



Thanks a lot, Sean, for the comments. I realize I didn't provide enough 
background information to properly diagnose this issue.


In the meantime, I have created some test cases for isolating the 
problem and running some specific performance tests. The numbers are 
quite revealing: Running our Spark model individually on Strings takes 
about 8 Sec for the test data, whereas is take 88 ms when run on the 
entire data in a single Dataset. This is a factor of 100x. This gets 
even worse for larger datasets.


So, the root cause here is the way the Spark model is being called for 
one string at a time by the self-built prediction pipeline (which is 
also using other ML techniques apart from Spark). Needs some 
re-factoring...


Thanks again for the help.

Cheers,

Martin

Am 2022-02-18 13:41, schrieb Sean Owen:

That doesn't make a lot of sense. Are you profiling the driver, rather 
than executors where the work occurs?

Is your data set quite small such that small overheads look big?
Do you even need Spark if your data is not distributed - coming from 
the driver anyway?


The fact that a static final field did anything suggests something is 
amiss with your driver program. Are you perhaps inadvertently 
serializing your containing class with a bunch of other data by using 
its methods in a closure?
If your data is small it's not surprising that the overhead could be in 
just copying the data around, the two methods you cite, rather than the 
compute.

Too many things here to really say what's going on.

On Fri, Feb 18, 2022 at 12:42 AM  wrote:


Hello,

I am working on optimising the performance of a Java ML/NLP 
application based on Spark / SparkNLP. For prediction, I am applying a 
trained model on a Spark dataset which consists of one column with 
only one row. The dataset is created like this:


List textList = Collections.singletonList(text);
Dataset data = sparkSession
.createDataset(textList, Encoders.STRING())
.withColumnRenamed(COL_VALUE, COL_TEXT);

The predictions are created like this:

PipelineModel fittedPipeline = pipeline.fit(dataset);

Dataset prediction = fittedPipeline.transform(dataset);

We noticed that the performance isn't quite as good as expected. After 
profiling the application with VisualVM, I noticed that the problem is 
with org.apache.spark.sql.Encoders.STRING() in the creation of the 
dataset, which by itself takes up about 75% of the time for the whole 
prediction method call.


So, is there a simpler and more efficient way of creating the required 
dataset, consisting of one column and one String row?


Thanks a lot.

Cheers,

Martin

Re: Spark Explain Plan and Joins

2022-02-21 Thread Gourav Sengupta
Hi,

I think that the best option is to use the SPARK UI. In SPARK 3.x the UI
and its additional settings are fantastic. Try to also see the settings for
Adaptive Query Execution in SPARK, under certain conditions it really works
wonders.

For certain long queries, the way you are finally triggering the action of
query execution, and whether you are using SPARK Dataframes or SPARK SQL,
and the settings in SPARK (look at the settings for SPARK 3.x) and a few
other aspects you will see that the plan is quite cryptic and difficult to
read sometimes.

Regards,
Gourav Sengupta

On Sun, Feb 20, 2022 at 7:32 PM Sid Kal  wrote:

> Hi Gourav,
>
> Right now I am just trying to understand the query execution plan by
> executing a simple join example via Spark SQL. The overall goal is to
> understand these plans so that going forward if my query runs slow due to
> data skewness or some other issues, I should be able to atleast understand
> what exactly is happening at the master and slave sides like map reduce.
>
> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> what are you trying to achieve by this?
>>
>> If there is a performance deterioration, try to collect the query
>> execution run time statistics from SPARK SQL. They can be seen from the
>> SPARK SQL UI and available over API's in case I am not wrong.
>>
>> Please ensure that you are not trying to over automate things.
>>
>> Reading how to understand the plans may be good depending on what you are
>> trying to do.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal  wrote:
>>
>>> I wrote a query like below and I am trying to understand its query
>>> execution plan.
>>>
>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join
>>> df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
>>> == Parsed Logical Plan ==
>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>>>:- 'SubqueryAlias a
>>>:  +- 'UnresolvedRelation [df], [], false
>>>+- 'SubqueryAlias b
>>>   +- 'UnresolvedRelation [df1], [], false
>>>
>>> == Analyzed Logical Plan ==
>>> CustomerID: int, CustomerName: string, state: string
>>> Project [CustomerID#640, CustomerName#641, state#988]
>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>:- SubqueryAlias a
>>>:  +- SubqueryAlias df
>>>: +-
>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>> csv
>>>+- SubqueryAlias b
>>>   +- SubqueryAlias df1
>>>  +-
>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>> csv
>>>
>>> == Optimized Logical Plan ==
>>> Project [CustomerID#640, CustomerName#641, state#988]
>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>:- Project [CustomerID#640, CustomerName#641]
>>>:  +- Filter isnotnull(CustomerID#640)
>>>: +-
>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>> csv
>>>+- Project [CustomerID#978, State#988]
>>>   +- Filter isnotnull(CustomerID#978)
>>>  +-
>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>> csv
>>>
>>> == Physical Plan ==
>>> *(5) Project [CustomerID#640, CustomerName#641, state#988]
>>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner
>>>:- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(CustomerID#640, 200),
>>> ENSURE_REQUIREMENTS, [id=#451]
>>>: +- *(1) Filter isnotnull(CustomerID#640)
>>>:+- FileScan csv [CustomerID#640,CustomerName#641] Batched:
>>> false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location:
>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
>>> struct
>>>+- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0
>>>   +- Exchange hashpartitioning(CustomerID#978, 200),
>>> ENSURE_REQUIREMENTS, [id=#459]
>>>