Re: Profiling PySpark Pandas UDF

2022-08-26 Thread Abdeali Kothari
Hi Luca, I see you pushed some code to the PR 3 hrs ago.
That's awesome. If I can help out in any way - do let me know
I think that's an amazing feature and would be great if it can get into
spark

On Fri, 26 Aug 2022, 12:41 Luca Canali,  wrote:

> @Abdeali as for “lightweight profiling”, there is some work in progress on
> instrumenting Python UDFs with Spark metrics, see
> https://issues.apache.org/jira/browse/SPARK-34265
>
> However it is a bit stuck at the moment, and needs to be revived I
> believe.
>
>
>
> Best,
>
> Luca
>
>
>
> *From:* Abdeali Kothari 
> *Sent:* Friday, August 26, 2022 06:36
> *To:* Subash Prabanantham 
> *Cc:* Russell Jurney ; Gourav Sengupta <
> gourav.sengu...@gmail.com>; Sean Owen ; Takuya UESHIN <
> ues...@happy-camper.st>; user 
> *Subject:* Re: Profiling PySpark Pandas UDF
>
>
>
> The python profiler is pretty cool !
>
> Ill try it out to see what could be taking time within the UDF with it.
>
>
>
> I'm wondering if there is also some lightweight profiling (which does not
> slow down my processing) for me to get:
>
>
>
>  - how much time the UDF took (like how much time was spent inside the UDF)
>
>  - how many times the UDF was called
>
>
>
> I can see the overall time a stage took in the Spark UI - would be cool if
> I could find the time a UDF takes too
>
>
>
> On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, 
> wrote:
>
> Wow, lots of good suggestions. I didn’t know about the profiler either.
> Great suggestion @Takuya.
>
>
>
>
>
> Thanks,
>
> Subash
>
>
>
> On Thu, 25 Aug 2022 at 19:30, Russell Jurney 
> wrote:
>
> YOU know what you're talking about and aren't hacking a solution. You are
> my new friend :) Thank you, this is incredibly helpful!
>
>
>
>
> Thanks,
>
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>
>
>
>
>
> On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN 
> wrote:
>
> Hi Subash,
>
> Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
> -
> https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf
>
> Hope it can help you.
>
> Thanks.
>
>
>
> On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney 
> wrote:
>
> Subash, I’m here to help :)
>
>
>
> I started a test script to demonstrate a solution last night but got a
> cold and haven’t finished it. Give me another day and I’ll get it to you.
> My suggestion is that you run PySpark locally in pytest with a fixture to
> generate and yield your SparckContext and SparkSession and the. Write tests
> that load some test data, perform some count operation and checkpoint to
> ensure that data is loaded, start a timer, run your UDF on the DataFrame,
> checkpoint again or write some output to disk to make sure it finishes and
> then stop the timer and compute how long it takes. I’ll show you some code,
> I have to do this for Graphlet AI’s RTL utils and other tools to figure out
> how much overhead there is using Pandera and Spark together to validate
> data: https://github.com/Graphlet-AI/graphlet
>
>
>
> I’ll respond by tomorrow evening with code in a fist! We’ll see if it gets
> consistent, measurable and valid results! :)
>
>
>
> Russell Jurney
>
>
>
> On Thu, Aug 25, 2022 at 10:00 AM Sean Owen  wrote:
>
> It's important to realize that while pandas UDFs and pandas on Spark are
> both related to pandas, they are not themselves directly related. The first
> lets you use pandas within Spark, the second lets you use pandas on Spark.
>
>
>
> Hard to say with this info but you want to look at whether you are doing
> something expensive in each UDF call and consider amortizing it with the
> scalar iterator UDF pattern. Maybe.
>
>
>
> A pandas UDF is not spark code itself so no there is no tool in spark to
> profile it. Conversely any approach to profiling pandas or python would
> work here .
>
>
>
> On Thu, Aug 25, 2022, 11:22 AM Gourav Sengupta 
> wrote:
>
> Hi,
>
>
>
> May be I am jumping to conclusions and making stupid guesses, but have you
> tried koalas now that it is natively integrated with pyspark??
>
>
>
> Regards
>
> Gourav
>
>
>
> On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, 
> wrote:
>
> Hi All,
>
>
>
> I was wondering if we have any best practices on using pandas UDF ?
> Profiling UDF is not an easy task and our case requires some drilling down
> on the logic of the function.
>
>
&

Re: Profiling PySpark Pandas UDF

2022-08-25 Thread Abdeali Kothari
The python profiler is pretty cool !
Ill try it out to see what could be taking time within the UDF with it.

I'm wondering if there is also some lightweight profiling (which does not
slow down my processing) for me to get:

 - how much time the UDF took (like how much time was spent inside the UDF)
 - how many times the UDF was called

I can see the overall time a stage took in the Spark UI - would be cool if
I could find the time a UDF takes too

On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, 
wrote:

> Wow, lots of good suggestions. I didn’t know about the profiler either.
> Great suggestion @Takuya.
>
>
> Thanks,
> Subash
>
> On Thu, 25 Aug 2022 at 19:30, Russell Jurney 
> wrote:
>
>> YOU know what you're talking about and aren't hacking a solution. You are
>> my new friend :) Thank you, this is incredibly helpful!
>>
>>
>> Thanks,
>> Russell Jurney @rjurney 
>> russell.jur...@gmail.com LI  FB
>>  datasyndrome.com
>>
>>
>> On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN 
>> wrote:
>>
>>> Hi Subash,
>>>
>>> Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
>>> -
>>> https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf
>>>
>>> Hope it can help you.
>>>
>>> Thanks.
>>>
>>> On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney <
>>> russell.jur...@gmail.com> wrote:
>>>
 Subash, I’m here to help :)

 I started a test script to demonstrate a solution last night but got a
 cold and haven’t finished it. Give me another day and I’ll get it to you.
 My suggestion is that you run PySpark locally in pytest with a fixture to
 generate and yield your SparckContext and SparkSession and the. Write tests
 that load some test data, perform some count operation and checkpoint to
 ensure that data is loaded, start a timer, run your UDF on the DataFrame,
 checkpoint again or write some output to disk to make sure it finishes and
 then stop the timer and compute how long it takes. I’ll show you some code,
 I have to do this for Graphlet AI’s RTL utils and other tools to figure out
 how much overhead there is using Pandera and Spark together to validate
 data: https://github.com/Graphlet-AI/graphlet

 I’ll respond by tomorrow evening with code in a fist! We’ll see if it
 gets consistent, measurable and valid results! :)

 Russell Jurney

 On Thu, Aug 25, 2022 at 10:00 AM Sean Owen  wrote:

> It's important to realize that while pandas UDFs and pandas on Spark
> are both related to pandas, they are not themselves directly related. The
> first lets you use pandas within Spark, the second lets you use pandas on
> Spark.
>
> Hard to say with this info but you want to look at whether you are
> doing something expensive in each UDF call and consider amortizing it with
> the scalar iterator UDF pattern. Maybe.
>
> A pandas UDF is not spark code itself so no there is no tool in spark
> to profile it. Conversely any approach to profiling pandas or python would
> work here .
>
> On Thu, Aug 25, 2022, 11:22 AM Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> May be I am jumping to conclusions and making stupid guesses, but
>> have you tried koalas now that it is natively integrated with pyspark??
>>
>> Regards
>> Gourav
>>
>> On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, <
>> subashpraba...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I was wondering if we have any best practices on using pandas UDF ?
>>> Profiling UDF is not an easy task and our case requires some drilling 
>>> down
>>> on the logic of the function.
>>>
>>>
>>> Our use case:
>>> We are using func(Dataframe) => Dataframe as interface to use Pandas
>>> UDF, while running locally only the function, it runs faster but when
>>> executed in Spark environment - the processing time is more than 
>>> expected.
>>> We have one column where the value is large (BinaryType -> 600KB),
>>> wondering whether this could make the Arrow computation slower ?
>>>
>>> Is there any profiling or best way to debug the cost incurred using
>>> pandas UDF ?
>>>
>>>
>>> Thanks,
>>> Subash
>>>
>>> --

 Thanks,
 Russell Jurney @rjurney 
 russell.jur...@gmail.com LI  FB
  datasyndrome.com

>>>
>>>
>>> --
>>> Takuya UESHIN
>>>
>>>


Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Abdeali Kothari
Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert,  wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari 
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>...: df1.explain()
>>>...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>: +- Filter isnotnull(id#4L)
>>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>>+- Project [id#0L, col1#1L, col2#20L]
>>>   +- SortMergeJoin [id#0L], [id#19L], Inner
>>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>  :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>  : +- Filter isnotnull(id#0L)
>>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>  +- Filter isnotnull(id#19L)  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>...: df2.createOrReplaceTempView('df2')
>>>...: df3 = spark.sql("""
>>>...: SELECT df1.id, df1.col1, df2.col2
>>>...: FROM df1 JOIN df2 ON df1.id = df2.id
>>>...: """)
>>>...: df3.createOrReplaceTempView('df3')
>>>...: df4 = spark.sql("""
>>>...: SELECT df2.*, df3.*
>>>...: FROM df2 JOIN df3 ON df2.id = df3.id
>>>...: """)
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:

Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-05 Thread Abdeali Kothari
Just thought I'd do a quick bump and add the dev mailing list - in case
there is some insight there
Feels like this should be categorized as a bug for spark 3.2.0

On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
wrote:

> Hi,
> I am using pyspark for some projects. And one of the things we are doing
> is trying to find the tables/columns being used by Spark using the
> execution plan.
>
> When we upgrade to spark 3.2 - the spark plan seems to be different from
> previous versions - mainly when we are doing joins.
> Below is a reproducible example (you could run the same in versions 2.3 to
> 3.1 to see the difference)
>
> My original data frames have the columns: id#0 and id#4
> But after doing the joins we are seeing new columns id#34 and id#19 which
> are not created from the original dataframes I was working with.
> In previous versions of spark, this used to use a ReusedExchange step
> (shown below)
>
> I was trying to understand if this is expected in spark 3.2 where the
> execution plan seems to be creating a new data source which does not
> originate from df1 and df2 which I provided.
> NOTE: The same happens even if I read from parquet files
>
> In spark 3.2:
> In [1]: import pyspark
>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>
> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
> 'col2'])
>...: df1.explain()
>...: df2.explain()
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#0L,col1#1L]
>
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#4L,col2#5L]
>
> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>:- Sort [id#4L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#53]
>: +- Filter isnotnull(id#4L)
>:+- Scan ExistingRDD[id#4L,col2#5L]
>+- Project [id#0L, col1#1L, col2#20L]
>   +- SortMergeJoin [id#0L], [id#19L], Inner
>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#45]
>  : +- Filter isnotnull(id#0L)
>  :+- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>  +- Filter isnotnull(id#19L)  +- Scan
> ExistingRDD[id#19L,col2#20L]*
>
> In [4]: df1.createOrReplaceTempView('df1')
>...: df2.createOrReplaceTempView('df2')
>...: df3 = spark.sql("""
>...: SELECT df1.id, df1.col1, df2.col2
>...: FROM df1 JOIN df2 ON df1.id = df2.id
>...: """)
>...: df3.createOrReplaceTempView('df3')
>...: df4 = spark.sql("""
>...: SELECT df2.*, df3.*
>...: FROM df2 JOIN df3 ON df2.id = df3.id
>...: """)
>...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>:- Sort [id#4L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#110]
>: +- Filter isnotnull(id#4L)
>:+- Scan ExistingRDD[id#4L,col2#5L]
>+- Project [id#0L, col1#1L, col2#35L]
>   +- SortMergeJoin [id#0L], [id#34L], Inner
>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#102]
>  : +- Filter isnotnull(id#0L)
>  :+- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>  +- Filter isnotnull(id#34L)  +- Scan
> ExistingRDD[id#34L,col2#35L]*
>
>
> Doing this in spark 3.1.1 - the plan is:
>
> *(8) SortMergeJoin [id#4L], [id#0L], Inner
> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
> : +- *(1) Filter isnotnull(id#4L)
> :+- *(1) Scan ExistingRDD[id#4L,col2#5L]
> +- *(7) Project [id#0L, col1#1L, col2#20L]
>+- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>   :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#62]
>   : +- *(3) Filter isnotnull(id#0L)
>   :+- *(3) Scan ExistingRDD[id#0L,col1#1L]
>
> *  +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +-
> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
> ENSURE_REQUIREMENTS, [id=#56]*
>
>


Spark 3.2 - ReusedExchange not present in join execution plan

2021-12-29 Thread Abdeali Kothari
Hi,
I am using pyspark for some projects. And one of the things we are doing is
trying to find the tables/columns being used by Spark using the execution
plan.

When we upgrade to spark 3.2 - the spark plan seems to be different from
previous versions - mainly when we are doing joins.
Below is a reproducible example (you could run the same in versions 2.3 to
3.1 to see the difference)

My original data frames have the columns: id#0 and id#4
But after doing the joins we are seeing new columns id#34 and id#19 which
are not created from the original dataframes I was working with.
In previous versions of spark, this used to use a ReusedExchange step
(shown below)

I was trying to understand if this is expected in spark 3.2 where the
execution plan seems to be creating a new data source which does not
originate from df1 and df2 which I provided.
NOTE: The same happens even if I read from parquet files

In spark 3.2:
In [1]: import pyspark
   ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
   ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
'col2'])
   ...: df1.explain()
   ...: df2.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[id#0L,col1#1L]

== Physical Plan ==
*(1) Scan ExistingRDD[id#4L,col2#5L]

In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
   ...: df4 = df2.join(df3, df1['id'] == df2['id'])
   ...: df4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#4L], [id#0L], Inner
   :- Sort [id#4L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
[id=#53]
   : +- Filter isnotnull(id#4L)
   :+- Scan ExistingRDD[id#4L,col2#5L]
   +- Project [id#0L, col1#1L, col2#20L]
  +- SortMergeJoin [id#0L], [id#19L], Inner
 :- Sort [id#0L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#45]
 : +- Filter isnotnull(id#0L)
 :+- Scan ExistingRDD[id#0L,col1#1L]



* +- Sort [id#19L ASC NULLS FIRST], false, 0+- Exchange
hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
 +- Filter isnotnull(id#19L)  +- Scan
ExistingRDD[id#19L,col2#20L]*

In [4]: df1.createOrReplaceTempView('df1')
   ...: df2.createOrReplaceTempView('df2')
   ...: df3 = spark.sql("""
   ...: SELECT df1.id, df1.col1, df2.col2
   ...: FROM df1 JOIN df2 ON df1.id = df2.id
   ...: """)
   ...: df3.createOrReplaceTempView('df3')
   ...: df4 = spark.sql("""
   ...: SELECT df2.*, df3.*
   ...: FROM df2 JOIN df3 ON df2.id = df3.id
   ...: """)
   ...: df4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#4L], [id#0L], Inner
   :- Sort [id#4L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
[id=#110]
   : +- Filter isnotnull(id#4L)
   :+- Scan ExistingRDD[id#4L,col2#5L]
   +- Project [id#0L, col1#1L, col2#35L]
  +- SortMergeJoin [id#0L], [id#34L], Inner
 :- Sort [id#0L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#102]
 : +- Filter isnotnull(id#0L)
 :+- Scan ExistingRDD[id#0L,col1#1L]



* +- Sort [id#34L ASC NULLS FIRST], false, 0+- Exchange
hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
 +- Filter isnotnull(id#34L)  +- Scan
ExistingRDD[id#34L,col2#35L]*


Doing this in spark 3.1.1 - the plan is:

*(8) SortMergeJoin [id#4L], [id#0L], Inner
:- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
: +- *(1) Filter isnotnull(id#4L)
:+- *(1) Scan ExistingRDD[id#4L,col2#5L]
+- *(7) Project [id#0L, col1#1L, col2#20L]
   +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
  :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#62]
  : +- *(3) Filter isnotnull(id#0L)
  :+- *(3) Scan ExistingRDD[id#0L,col1#1L]

*  +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +-
ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
ENSURE_REQUIREMENTS, [id=#56]*


Re: Collecting list of errors across executors

2021-08-03 Thread Abdeali Kothari
You could create a custom accumulator using a linkedlist or so.

Some examples that could help:
https://towardsdatascience.com/custom-pyspark-accumulators-310f63ca3c8c
https://stackoverflow.com/questions/34798578/how-to-create-custom-list-accumulator-i-e-listint-int


On Tue, Aug 3, 2021 at 1:23 PM Sachit Murarka 
wrote:

> Hi Team,
>
> We are using rdd.foreach(lambda x : do_something(x))
>
> Our use case requires collecting of the error messages in a list which are
> coming up in the exception block of the method do_something.
> Since this will be running on executor , a global list won't work here. As
> the state needs to be shared among various executors, I thought of using
> Accumulator,
> but the accumulator uses only Integral values.
>
> Can someone please suggest how do I collect all errors in a list which are
> coming from all records of RDD.
>
> Thanks,
> Sachit Murarka
>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Abdeali Kothari
In a bash terminal, can you do:
*export PYSPARK_DRIVER_PYTHON=/path/to/venv/bin/python*
and then:
run the *spark-shell* script ?

This should mimic the behaviour of jupyter in spark-shell and should be
fast (1-2mins similar to jupyter notebook)
This would confirm the guess that the python2.7 venv has some magic ^_^



On Wed, Sep 11, 2019 at 10:32 PM Dhrubajyoti Hati 
wrote:

> Also the performance remains identical when running the same script from
> jupyter terminal instead or normal terminal. In the script the spark
> context is created by
>
> spark = SparkSession \
> .builder \
> ..
> ..
> getOrCreate() command
>
>
> On Wed, Sep 11, 2019 at 10:28 PM Dhrubajyoti Hati 
> wrote:
>
>> If you say that libraries are not transferred by default and in my case I
>> haven't used any --py-files then just because the driver python is
>> different I have facing 6x speed difference ? I am using client mode to
>> submit the program but the udfs and all are executed in the executors, then
>> why is the difference so much?
>>
>> I tried the prints
>> For jupyter one the driver prints
>> ../../jupyter-folder/venv
>>
>> and executors print /usr
>>
>> For spark-submit both of them print /usr
>>
>> The cluster is created few years back and used organisation wide. So how
>> python 2.6.6 is installed, i honestly do not know.  I copied the whole
>> jupyter from org git repo as it was shared, so i do not know how the venv
>> was created or python for venv was created even.
>>
>> The os is CentOS release 6.9 (Final)
>>
>>
>>
>>
>>
>> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>>
>>
>> On Wed, Sep 11, 2019 at 8:22 PM Abdeali Kothari 
>> wrote:
>>
>>> The driver python may not always be the same as the executor python.
>>> You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON
>>>
>>> The dependent libraries are not transferred by spark in any way unless
>>> you do a --py-files or .addPyFile()
>>>
>>> Could you try this:
>>> *import sys; print(sys.prefix)*
>>>
>>> on the driver, and also run this inside a UDF with:
>>>
>>> *def dummy(a):*
>>> *import sys; raise AssertionError(sys.prefix)*
>>>
>>> and get the traceback exception on the driver ?
>>> This would be the best way to get the exact sys.prefix (python path) for
>>> both the executors and driver.
>>>
>>> Also, could you elaborate on what environment is this ?
>>> Linux? - CentOS/Ubuntu/etc. ?
>>> How was the py 2.6.6 installed ?
>>> How was the py 2.7.5 venv created and how what the base py 2.7.5
>>> installed ?
>>>
>>> Also, how are you creating the Spark Session in jupyter ?
>>>
>>>
>>> On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati 
>>> wrote:
>>>
>>>> But would it be the case for multiple tasks running on the same worker
>>>> and also both the tasks are running in client mode, so the one true is true
>>>> for both or for neither. As mentioned earlier all the confs are same. I
>>>> have checked and compared each conf.
>>>>
>>>> As Abdeali mentioned it must be because the  way libraries are in both
>>>> the environments. Also i verified by running the same script for jupyter
>>>> environment and was able to get the same result using the normal script
>>>> which i was running with spark-submit.
>>>>
>>>> Currently i am searching for the ways the python packages are
>>>> transferred from driver to spark cluster in client mode. Any info on that
>>>> topic would be helpful.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>> On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <
>>>> pmccar...@dstillery.com> wrote:
>>>>
>>>>> Are you running in cluster mode? A large virtualenv zip for the driver
>>>>> sent into the cluster on a slow pipe could account for much of that eight
>>>>> minutes.
>>>>>
>>>>> On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <
>>>>> dhruba.w...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I just ran the same script in a shell in jupyter notebook and find
>>>>>> the performance to be similar. So I can confirm this is because the
>>>>>> libraries used jupyter notebook python is different than th

Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Abdeali Kothari
The driver python may not always be the same as the executor python.
You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON

The dependent libraries are not transferred by spark in any way unless you
do a --py-files or .addPyFile()

Could you try this:
*import sys; print(sys.prefix)*

on the driver, and also run this inside a UDF with:

*def dummy(a):*
*import sys; raise AssertionError(sys.prefix)*

and get the traceback exception on the driver ?
This would be the best way to get the exact sys.prefix (python path) for
both the executors and driver.

Also, could you elaborate on what environment is this ?
Linux? - CentOS/Ubuntu/etc. ?
How was the py 2.6.6 installed ?
How was the py 2.7.5 venv created and how what the base py 2.7.5 installed ?

Also, how are you creating the Spark Session in jupyter ?


On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati 
wrote:

> But would it be the case for multiple tasks running on the same worker and
> also both the tasks are running in client mode, so the one true is true for
> both or for neither. As mentioned earlier all the confs are same. I have
> checked and compared each conf.
>
> As Abdeali mentioned it must be because the  way libraries are in both the
> environments. Also i verified by running the same script for jupyter
> environment and was able to get the same result using the normal script
> which i was running with spark-submit.
>
> Currently i am searching for the ways the python packages are transferred
> from driver to spark cluster in client mode. Any info on that topic would
> be helpful.
>
> Thanks!
>
>
>
> On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, 
> wrote:
>
>> Are you running in cluster mode? A large virtualenv zip for the driver
>> sent into the cluster on a slow pipe could account for much of that eight
>> minutes.
>>
>> On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati 
>> wrote:
>>
>>> Hi,
>>>
>>> I just ran the same script in a shell in jupyter notebook and find the
>>> performance to be similar. So I can confirm this is because the libraries
>>> used jupyter notebook python is different than the spark-submit python this
>>> is happening.
>>>
>>> But now I have a following question. Are the dependent libraries in a
>>> python script also transferred to the worker machines when executing a
>>> python script in spark. Because though the driver python versions are
>>> different, the workers machines will use their same python environment to
>>> run the code. If anyone can explain this part, it would be helpful.
>>>
>>>
>>>
>>>
>>> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>>>
>>>
>>> On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati 
>>> wrote:
>>>
>>>> Just checked from where the script is submitted i.e. wrt Driver, the
>>>> python env are different. Jupyter one is running within a the virtual
>>>> environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But
>>>> the executors have the same python version right? I tried doing a
>>>> spark-submit from jupyter shell, it fails to find python 2.7  which is not
>>>> there hence throws error.
>>>>
>>>> Here is the udf which might take time:
>>>>
>>>> import base64
>>>> import zlib
>>>>
>>>> def decompress(data):
>>>>
>>>> bytecode = base64.b64decode(data)
>>>> d = zlib.decompressobj(32 + zlib.MAX_WBITS)
>>>> decompressed_data = d.decompress(bytecode )
>>>> return(decompressed_data.decode('utf-8'))
>>>>
>>>>
>>>> Could this because of the two python environment mismatch from Driver 
>>>> side? But the processing
>>>>
>>>> happens in the executor side?
>>>>
>>>>
>>>>
>>>>
>>>> *Regards,Dhrub*
>>>>
>>>> On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <
>>>> abdealikoth...@gmail.com> wrote:
>>>>
>>>>> Maybe you can try running it in a python shell or
>>>>> jupyter-console/ipython instead of a spark-submit and check how much time
>>>>> it takes too.
>>>>>
>>>>> Compare the env variables to check that no additional env
>>>>> configuration is present in either environment.
>>>>>
>>>>> Also is the python environment for both the exact same? I ask because
>>>>> it looks like you're using a UDF and if the J

Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Abdeali Kothari
Maybe you can try running it in a python shell or jupyter-console/ipython
instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is
present in either environment.

Also is the python environment for both the exact same? I ask because it
looks like you're using a UDF and if the Jupyter python has (let's say)
numpy compiled with blas it would be faster than a numpy without it. Etc.
I.E. Some library you use may be using pure python and another may be using
a faster C extension...

What python libraries are you using in the UDFs? It you don't use UDFs at
all and use some very simple pure spark functions does the time difference
still exist?

Also are you using dynamic allocation or some similar spark config which
could vary performance between runs because the same resources we're not
utilized on Jupyter / spark-submit?


On Wed, Sep 11, 2019, 08:43 Stephen Boesch  wrote:

> Sounds like you have done your homework to properly compare .   I'm
> guessing the answer to the following is yes .. but in any case:  are they
> both running against the same spark cluster with the same configuration
> parameters especially executor memory and number of workers?
>
> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
> dhruba.w...@gmail.com>:
>
>> No, i checked for that, hence written "brand new" jupyter notebook. Also
>> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
>> compressed base64 encoded text data from a hive table and decompressing and
>> decoding in one of the udfs. Also the time compared is from Spark UI not
>> how long the job actually takes after submission. Its just the running time
>> i am comparing/mentioning.
>>
>> As mentioned earlier, all the spark conf params even match in two scripts
>> and that's why i am puzzled what going on.
>>
>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, 
>> wrote:
>>
>>> It's not obvious from what you pasted, but perhaps the juypter notebook
>>> already is connected to a running spark context, while spark-submit needs
>>> to get a new spot in the (YARN?) queue.
>>>
>>> I would check the cluster job IDs for both to ensure you're getting new
>>> cluster tasks for each.
>>>
>>> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
>>> wrote:
>>>
 Hi,

 I am facing a weird behaviour while running a python script. Here is
 what the code looks like mostly:

 def fn1(ip):
some code...
 ...

 def fn2(row):
 ...
 some operations
 ...
 return row1


 udf_fn1 = udf(fn1)
 cdf = spark.read.table("") //hive table is of size > 500 Gigs with
 ~4500 partitions
 ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
 .drop("colz") \
 .withColumnRenamed("colz", "coly")

 edf = ddf \
 .filter(ddf.colp == 'some_value') \
 .rdd.map(lambda row: fn2(row)) \
 .toDF()

 print edf.count() // simple way for the performance test in both
 platforms

 Now when I run the same code in a brand new jupyter notebook it runs 6x
 faster than when I run this python script using spark-submit. The
 configurations are printed and  compared from both the platforms and they
 are exact same. I even tried to run this script in a single cell of jupyter
 notebook and still have the same performance. I need to understand if I am
 missing something in the spark-submit which is causing the issue.  I tried
 to minimise the script to reproduce the same error without much code.

 Both are run in client mode on a yarn based spark cluster. The machines
 from which both are executed are also the same and from same user.

 What i found is the  the quantile values for median for one ran with
 jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
 able to figure out why this is happening.

 Any one faced this kind of issue before or know how to resolve this?

 *Regards,*
 *Dhrub*

>>>
>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>


Re: Usage of PyArrow in Spark

2019-07-18 Thread Abdeali Kothari
I was thinking of implementing that. But quickly realized that doing a
conversion of Spark -> Pandas -> Python causes errors.

A quick example being "None" in Numeric data types.
Pandas supports only NaN. Spark supports NULL and NaN.

This is just one of the issues I came to.
I'm not sure about some of the more complex types like Array, Map, struct
which are internally converted to pd.Series with type being object.

I think that avoiding pandas in between and doing something from Arrow to
Python would be more efficient as, if I understand right, Arrow has a wider
range of types and can handle this better.

>>> from pyspark.sql import functions as F
>>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val'])

# Return the column with no change
>>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col)
>>> sdf.select(sdf['val'], udf(sdf['val'])).show()
++-+
| val|(val)|
++-+
|null| null|
| NaN| null|
| 1.1|  1.1|
++-+

# isnull()
>>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
col.isnull())
>>> sdf.select(sdf['val'], udf(sdf['val'])).show()
++-+
| val|(val)|
++-+
|null|  1.0|
| NaN|  1.0|
| 1.1|  0.0|
++-+

# Check for "is None"
>>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
col.apply(lambda x: x is None))
>>> sdf.select(sdf['val'], udf(sdf['val'])).show()
++-+
| val|(val)|
++-+
|null|  0.0|
| NaN|  0.0|
| 1.1|  0.0|
++-+

On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon  wrote:

> Regular Python UDFs don't use PyArrow under the hood.
> Yes, they can potentially benefit but they can be easily worked around via
> Pandas UDFs.
>
> For instance, both below are virtually identical.
>
> @udf(...)
> def func(col):
> return col
>
> @pandas_udf(...)
> def pandas_func(col):
> return a.apply(lambda col: col)
>
> If we only need some minimised change, I would be positive about adding
> Arrow support into regular Python UDFs. Otherwise, I am not sure yet.
>
>
> 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이 작성:
>
>> Hi,
>> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
>> spark. And I was trying to understand the benefit in terms of serialization
>> / deserializaiton it provides.
>>
>> I understand that the new pandas-udf works only if pyarrow is installed.
>> But what about the plain old PythonUDF which can be used in map() kind of
>> operations?
>> Are they also using pyarrow under the hood to reduce the cost is serde?
>> Or do they remain as earlier and no performance gain should be expected in
>> those?
>>
>> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow
>> as the data transfer cost to serialize/deserialzie from Java to Python and
>> back still exists and could potentially be reduced by using Arrow?
>> Is my understanding correct? Are there any plans to implement this?
>>
>> Pointers to any notes or Jira about this would be appreciated.
>>
>


Usage of PyArrow in Spark

2019-07-16 Thread Abdeali Kothari
Hi,
In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
spark. And I was trying to understand the benefit in terms of serialization
/ deserializaiton it provides.

I understand that the new pandas-udf works only if pyarrow is installed.
But what about the plain old PythonUDF which can be used in map() kind of
operations?
Are they also using pyarrow under the hood to reduce the cost is serde? Or
do they remain as earlier and no performance gain should be expected in
those?

If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow as
the data transfer cost to serialize/deserialzie from Java to Python and
back still exists and could potentially be reduced by using Arrow?
Is my understanding correct? Are there any plans to implement this?

Pointers to any notes or Jira about this would be appreciated.


Re: [pyspark 2.3+] CountDistinct

2019-07-01 Thread Abdeali Kothari
I can't exactly reproduce this. Here is what I tried quickly:

import uuid

import findspark
findspark.init()  # noqa
import pyspark
from pyspark.sql import functions as F  # noqa: N812

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
[str(uuid.uuid4()) for i in range(45)],
], ['col1'])

print('>>>> Spark version:', spark.sparkContext.version)
print('>>>> Null count:', df.filter(F.col('col1').isNull()).count())
print('>>>> Value count:', df.filter(F.col('col1').isNotNull()).count())
print('>>>> Distinct Count 1:',
df.agg(F.countDistinct(F.col('col1'))).collect()[0][0])
print('>>>> Distinct Count 2:',
df.agg(F.countDistinct(F.col('col1'))).collect()[0][0])

This always returns:
>>>> Spark version: 2.4.0
>>>> Null count: 0
>>>> Value count: 45
>>>> Distinct Count 1: 45
>>>> Distinct Count 2: 45




On Sat, Jun 29, 2019 at 6:51 PM Rishi Shah  wrote:

> Thanks Abdeali! Please find details below:
>
> df.agg(countDistinct(col('col1'))).show() --> 450089
> df.agg(countDistinct(col('col1'))).show() --> 450076
> df.filter(col('col1').isNull()).count() --> 0
> df.filter(col('col1').isNotNull()).count() --> 450063
>
> col1 is a string
> Spark version 2.4.0
> datasize: ~ 500GB
>
>
> On Sat, Jun 29, 2019 at 5:33 AM Abdeali Kothari 
> wrote:
>
>> How large is the data frame and what data type are you counting distinct
>> for?
>> I use count distinct quite a bit and haven't noticed any thing peculiar.
>>
>> Also, which exact version in 2.3.x?
>> And, are performing any operations on the DF before the countDistinct?
>>
>> I recall there was a bug when I did countDistinct(PythonUDF(x)) in the
>> same query which was resolved in one of the minor versions in 2.3.x
>>
>> On Sat, Jun 29, 2019, 10:32 Rishi Shah  wrote:
>>
>>> Hi All,
>>>
>>> Just wanted to check in to see if anyone has any insight about this
>>> behavior. Any pointers would help.
>>>
>>> Thanks,
>>> Rishi
>>>
>>> On Fri, Jun 14, 2019 at 7:05 AM Rishi Shah 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Recently we noticed that countDistinct on a larger dataframe doesn't
>>>> always return the same value. Any idea? If this is the case then what is
>>>> the difference between countDistinct & approx_count_distinct?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>
> --
> Regards,
>
> Rishi Shah
>


Re: [pyspark 2.3+] CountDistinct

2019-06-29 Thread Abdeali Kothari
How large is the data frame and what data type are you counting distinct
for?
I use count distinct quite a bit and haven't noticed any thing peculiar.

Also, which exact version in 2.3.x?
And, are performing any operations on the DF before the countDistinct?

I recall there was a bug when I did countDistinct(PythonUDF(x)) in the same
query which was resolved in one of the minor versions in 2.3.x

On Sat, Jun 29, 2019, 10:32 Rishi Shah  wrote:

> Hi All,
>
> Just wanted to check in to see if anyone has any insight about this
> behavior. Any pointers would help.
>
> Thanks,
> Rishi
>
> On Fri, Jun 14, 2019 at 7:05 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> Recently we noticed that countDistinct on a larger dataframe doesn't
>> always return the same value. Any idea? If this is the case then what is
>> the difference between countDistinct & approx_count_distinct?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


Re: [spark on yarn] spark on yarn without DFS

2019-05-19 Thread Abdeali Kothari
While spark can read from S3 directly in EMR, I believe it still needs the
HDFS to perform shuffles and to write intermediate data into disk when
doing jobs (I.e. when the in memory need stop spill over to disk)

For these operations, Spark does need a distributed file system - You could
use something like EMRFS (which is like a HDFS backed by S3) on Amazon.

The issue could be something else too - so a stacktrace or error message
could help in understanding the problem.



On Mon, May 20, 2019, 07:20 Huizhe Wang  wrote:

> Hi,
>
> I wanna to use Spark on Yarn without HDFS.I store my resource in AWS and
> using s3a to get them. However, when I use stop-dfs.sh stoped Namenode and
> DataNode. I got an error when using yarn cluster mode. Could I using yarn
> without start DFS, how could I use this mode?
>
> Yours,
> Jane
>


Re: spark-sklearn

2019-04-08 Thread Abdeali Kothari
I haven't used spark-sklearn much, but their travis file gives the
combination they test with:
https://github.com/databricks/spark-sklearn/blob/master/.travis.yml#L8
Also, your first email is a bit confusing - you mentioned Spark 2.2.3 but
the traceback path says spark-2.4.1-bin-hadoop2.6

I then tried with the same pip versions you mentioned (but I used python
3.6 and spark-2.4.1-bin-hadoop2.7 though) - and it still worked for me



On Tue, Apr 9, 2019, 01:52 Sudhir Babu Pothineni 
wrote:

> Thanks Stephen, saw that, but this is already released version of
> spark-sklearn-0.3.0, tests should be working.
>
> So just checking if I am doing anything wrong, version of other libraries
> etc..
>
> Thanks
> Sudhir
>
> On Apr 8, 2019, at 1:52 PM, Stephen Boesch  wrote:
>
> There are several suggestions on this SOF
> https://stackoverflow.com/questions/38984775/spark-errorexpected-zero-arguments-for-construction-of-classdict-for-numpy-cor
>
> 1
>
> You need to convert the final value to a python list. You implement the
> function as follows:
>
> def uniq_array(col_array):
> x = np.unique(col_array)
> return list(x)
>
> This is because Spark doesn't understand the numpy array format. In order
> to feed a python object that Spark DataFrames understand as an ArrayType,
> you need to convert the output to a python list before returning it.
>
>
>
>
> The source of the problem is that object returned from the UDF doesn't
> conform to the declared type. np.unique not only returns numpy.ndarray but
> also converts numerics to the corresponding NumPy types which are not
> compatible  with
> DataFrame API. You can try something like this:
>
> udf(lambda x: list(set(x)), ArrayType(IntegerType()))
>
> or this (to keep order)
>
> udf(lambda xs: list(OrderedDict((x, None) for x in xs)),
> ArrayType(IntegerType()))
>
> instead.
>
> If you really want np.unique you have to convert the output:
>
> udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType()))
>
>
>
>
>
>
>
>
>
>
>
>
>
> Am Mo., 8. Apr. 2019 um 11:43 Uhr schrieb Sudhir Babu Pothineni <
> sbpothin...@gmail.com>:
>
>>
>>
>>
>> Trying to run tests in spark-sklearn, anybody check the below exception
>>
>> pip freeze:
>>
>> nose==1.3.7
>> numpy==1.16.1
>> pandas==0.19.2
>> python-dateutil==2.7.5
>> pytz==2018.9
>> scikit-learn==0.19.2
>> scipy==1.2.0
>> six==1.12.0
>> spark-sklearn==0.3.0
>>
>> Spark version:
>> spark-2.2.3-bin-hadoop2.6/bin/pyspark
>>
>>
>> running into following exception:
>>
>> ==
>> ERROR: test_scipy_sparse (spark_sklearn.converter_test.CSRVectorUDTTests)
>> --
>> Traceback (most recent call last):
>>   File
>> "/home/spothineni/Downloads/spark-sklearn-release-0.3.0/python/spark_sklearn/converter_test.py",
>> line 83, in test_scipy_sparse
>> self.assertEqual(df.count(), 1)
>>   File
>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py",
>> line 522, in count
>> return int(self._jdf.count())
>>   File
>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>> line 1257, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/utils.py",
>> line 63, in deco
>> return f(*a, **kw)
>>   File
>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> Py4JJavaError: An error occurred while calling o652.count.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in
>> stage 0.0 (TID 11, localhost, executor driver):
>> net.razorvine.pickle.PickleException: expected zero arguments for
>> construction of ClassDict (for numpy.dtype)
>> at
>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>> at
>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
>> at
>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
>> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>> at 

Re: Qn about decision tree apache spark java

2019-04-04 Thread Abdeali Kothari
The Datasets is in a fairly popular data format called libsvm data format -
popularized by the libsvm library.

http://svmlight.joachims.org - The 'How to Use' section describes the file
format.

XGBoost uses the same file format and their documentation is here -
https://xgboost.readthedocs.io/en/latest/tutorials/input_format.html


On Fri, Apr 5, 2019, 03:13 Serena S Yuan  wrote:

> Hi,
> I am trying to use apache spark's decision tree classifier. I am
> trying to implement the method found in
> https://spark.apache.org/docs/1.5.1/ml-decision-tree.html 's
> classification example. I found the dataset at
>
> https://github.com/apache/spark/blob/master/data/mllib/sample_libsvm_data.txt
> and I have some trouble understanding its format. Is the first column
> the label? Why are there indices and a colon in front of other number
> values and what do the indices represent?
>   Lastly, how do I print out a prediction given new test data?
>
> Thanks,
>Serena Sian Yuan
>
> --
> Sian Ees Super.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: pickling a udf

2019-04-04 Thread Abdeali Kothari
The syntax looks right.
Are you still getting the error when you open a new python session and run
this same code ?
Are you running on your laptop with spark local mode or are you running
this on a yarn based cluster ?
It does seem like something in your python session isnt getting serialized
right. But does not look like it's related to this code snippet.

On Thu, Apr 4, 2019 at 3:49 PM Adaryl Wakefield <
adaryl.wakefi...@hotmail.com> wrote:

> Are we not supposed to be using udfs anymore? I copied an example straight
> from a book and I’m getting weird results and I think it’s because the book
> is using a much older version of Spark.  The code below is pretty straight
> forward but I’m getting an error none the less. I’ve been doing a bunch of
> googling and not getting much results.
>
>
>
> from pyspark.sql import SparkSession
>
> from pyspark.sql.functions import *
>
> from pyspark.sql.types import *
>
>
>
> spark = SparkSession \
>
> .builder \
>
> .appName("Python Spark SQL basic example") \
>
> .getOrCreate()
>
>
>
> df = spark.read.csv("full201801.dat",header="true")
>
>
>
> columntransform = udf(lambda x: 'Non-Fat Dry Milk' if x == '23040010' else
> 'foo', StringType())
>
>
>
> df.select(df.PRODUCT_NC,
> columntransform(df.PRODUCT_NC).alias('COMMODITY')).show()
>
>
>
> Error.
>
> *Py4JJavaError*: An error occurred while calling o110.showString.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 2.0 (TID 2, localhost, executor driver):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>
>   File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 242, in
> main
>
>   File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 144, in
> read_udfs
>
>   File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 120, in
> read_single_udf
>
>   File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 60, in
> read_command
>
>   File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 171,
> in _read_with_length
>
> return self.loads(obj)
>
>   File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 566,
> in loads
>
> return pickle.loads(obj, encoding=encoding)
>
> TypeError: _fill_function() missing 4 required positional arguments:
> 'defaults', 'dict', 'module', and 'closure_values'
>
>
>
>
>
> B.
>
>
>
>
>
>
>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Abdeali Kothari
How long does it take to do the window solution ? (Also mention how many
executors was your spark application using on average during that time)
I am not aware of anything that is faster. When I ran is on my data ~8-9GB
I think it took less than 5 mins (don't remember exact time)

On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri 
wrote:

> Thanks for awesome clarification / explanation.
>
> I have cases where update_time can be same.
> I am in need of suggestions, where I have very large data like 5 GB, this
> window based solution which I mentioned is taking very long time.
>
> Thanks again.
>
> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari 
> wrote:
>
>> So, the above code for min() worked for me fine in general, but there was
>> one corner case where it failed.
>> Which was when I have something like:
>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>
>> In this example, the update_time for 2 records is the exact same. So,
>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>> This is avoided in your code snippet of row_num - because 2 rows will
>> never have row_num = 1
>>
>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>> (because orderBy is on update_time and they have the same value of
>> update_time).
>> Hence dropDuplicates can be used there cause it can be either one of
>> those rows.
>>
>> Overall - dropDuplicates seems like it's meant for cases where you
>> literally have redundant duplicated data. And not for filtering to get
>> first/last etc.
>>
>>
>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Abdeali, Thank you for your response.
>>>
>>> Can you please explain me this line, And the dropDuplicates at the end
>>> ensures records with two values for the same 'update_time' don't cause
>>> issues.
>>>
>>> Sorry I didn't get quickly. :)
>>>
>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>> abdealikoth...@gmail.com> wrote:
>>>
>>>> I've faced this issue too - and a colleague pointed me to the
>>>> documentation -
>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>> dropDuplicates docs does not say that it will guarantee that it will
>>>> return the "first" record (even if you sort your dataframe)
>>>> It would give you any record it finds and just ensure that duplicates
>>>> are not present.
>>>>
>>>> The only way I know of how to do this is what you did, but you can
>>>> avoid the sorting inside the partition with something like (in pyspark):
>>>>
>>>> from pyspark.sql import Window, functions as F
>>>> df = df.withColumn('wanted_time',
>>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>>
>>>> The min() is faster than doing an orderBy() and a row_number().
>>>> And the dropDuplicates at the end ensures records with two values for
>>>> the same 'update_time' don't cause issues.
>>>>
>>>>
>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hello Dear Spark Users,
>>>>>
>>>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>>>> every time I run it drops different - different rows based on same
>>>>> timestamp.
>>>>>
>>>>> What I tried and worked
>>>>>
>>>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>>>>> desc)
>>>>>
>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>>> .drop("rn").drop("update_time")
>>>>>
>>>>> But this is damn slow...
>>>>>
>>>>> Can someone please throw a light.
>>>>>
>>>>> Thanks
>>>>>
>>>>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Abdeali Kothari
So, the above code for min() worked for me fine in general, but there was
one corner case where it failed.
Which was when I have something like:
invoice_id=1, update_time=*2018-01-01 15:00:00.000*
invoice_id=1, update_time=*2018-01-01 15:00:00.000*
invoice_id=1, update_time=2018-02-03 14:00:00.000

In this example, the update_time for 2 records is the exact same. So, doing
a filter for the min() will result in 2 records for the invoice_id=1.
This is avoided in your code snippet of row_num - because 2 rows will never
have row_num = 1

But note that here - row_num=1 and row_num=2 will be randomly ordered
(because orderBy is on update_time and they have the same value of
update_time).
Hence dropDuplicates can be used there cause it can be either one of those
rows.

Overall - dropDuplicates seems like it's meant for cases where you
literally have redundant duplicated data. And not for filtering to get
first/last etc.


On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri 
wrote:

> Hello Abdeali, Thank you for your response.
>
> Can you please explain me this line, And the dropDuplicates at the end
> ensures records with two values for the same 'update_time' don't cause
> issues.
>
> Sorry I didn't get quickly. :)
>
> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari 
> wrote:
>
>> I've faced this issue too - and a colleague pointed me to the
>> documentation -
>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>> dropDuplicates docs does not say that it will guarantee that it will
>> return the "first" record (even if you sort your dataframe)
>> It would give you any record it finds and just ensure that duplicates are
>> not present.
>>
>> The only way I know of how to do this is what you did, but you can avoid
>> the sorting inside the partition with something like (in pyspark):
>>
>> from pyspark.sql import Window, functions as F
>> df = df.withColumn('wanted_time',
>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>
>> The min() is faster than doing an orderBy() and a row_number().
>> And the dropDuplicates at the end ensures records with two values for the
>> same 'update_time' don't cause issues.
>>
>>
>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Dear Spark Users,
>>>
>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>> every time I run it drops different - different rows based on same
>>> timestamp.
>>>
>>> What I tried and worked
>>>
>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>>> desc)
>>>
>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>> row_number.over(wSpec)).where($"rn" === 1)
>>> .drop("rn").drop("update_time")
>>>
>>> But this is damn slow...
>>>
>>> Can someone please throw a light.
>>>
>>> Thanks
>>>
>>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-03 Thread Abdeali Kothari
I've faced this issue too - and a colleague pointed me to the documentation
-
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
dropDuplicates docs does not say that it will guarantee that it will return
the "first" record (even if you sort your dataframe)
It would give you any record it finds and just ensure that duplicates are
not present.

The only way I know of how to do this is what you did, but you can avoid
the sorting inside the partition with something like (in pyspark):

from pyspark.sql import Window, functions as F
df = df.withColumn('wanted_time',
F.min('update_time').over(Window.partitionBy('invoice_id')))
out_df = df.filter(df['update_time'] == df['wanted_time'])
.drop('wanted_time').dropDuplicates('invoice_id', 'update_time')

The min() is faster than doing an orderBy() and a row_number().
And the dropDuplicates at the end ensures records with two values for the
same 'update_time' don't cause issues.


On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri 
wrote:

> Hello Dear Spark Users,
>
> I am using dropDuplicate on a DataFrame generated from large parquet file
> from(HDFS) and doing dropDuplicate based on timestamp based column, every
> time I run it drops different - different rows based on same timestamp.
>
> What I tried and worked
>
> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
> desc)
>
> val irqDistinctDF = irqFilteredDF.withColumn("rn",
> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time")
>
> But this is damn slow...
>
> Can someone please throw a light.
>
> Thanks
>
>


Re: Occasional broadcast timeout when dynamic allocation is on

2019-02-26 Thread Abdeali Kothari
I've been facing this issue for the past few months too.
I always thought it was an infrastructure issue, but we were never able to
figure out what the infra issue was.

If others are facing this issue too - then maybe it's a valid bug.

Does anyone have any ideas on how we can debug this?

On Fri, Feb 22, 2019, 16:10 Artem P  Hi!
>
> We have dynamic allocation enabled for our regular jobs and sometimes they
> fail with java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]. Seems like spark driver starts broadcast just before the job
> has received any executors from the YARN  and if it takes more than 5
> minutes to acquire them, the broadcast fails with TimeoutException. Is
> there any way to force Spark start broadcast only after all executors are
> in place (at least minExecutors count)?
>
> Relevant logs (it can be seen, that exception happened 5 minutes after
> broadcast start message):
> ```
>
> 2019-02-22 06:11:48,047 [broadcast-exchange-0] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 361.70265 ms
> 2019-02-22 06:11:48,237 [broadcast-exchange-0] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_0 stored as 
> values in memory (estimated size 485.3 KB, free 365.8 MB)
> 2019-02-22 06:11:48,297 [main] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 611.624073 ms
> 2019-02-22 06:11:48,522 [broadcast-exchange-0] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_0_piece0 
> stored as bytes in memory (estimated size 63.5 KB, free 365.8 MB)
> 2019-02-22 06:11:48,524 [dispatcher-event-loop-4] INFO  
> org.apache.spark.storage.BlockManagerInfo  - Added broadcast_0_piece0 in 
> memory on ip-10-4-3-123.eu-central-1.compute.internal:42935 (size: 63.5 KB, 
> free: 366.2 MB)
> 2019-02-22 06:11:48,531 [broadcast-exchange-0] INFO  
> org.apache.spark.SparkContext  - Created broadcast 0 from run at 
> ThreadPoolExecutor.java:1149
> 2019-02-22 06:11:48,545 [broadcast-exchange-0] INFO  
> org.apache.spark.sql.execution.FileSourceScanExec  - Planning scan with bin 
> packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 
> bytes.
> 2019-02-22 06:11:48,859 [broadcast-exchange-0] INFO  
> org.apache.spark.SparkContext  - Starting job: run at 
> ThreadPoolExecutor.java:1149
> 2019-02-22 06:11:48,885 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Got job 0 (run at 
> ThreadPoolExecutor.java:1149) with 1 output partitions
> 2019-02-22 06:11:48,886 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Final stage: ResultStage 0 (run at 
> ThreadPoolExecutor.java:1149)
> 2019-02-22 06:11:48,893 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Parents of final stage: List()
> 2019-02-22 06:11:48,895 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Missing parents: List()
> 2019-02-22 06:11:48,940 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Submitting ResultStage 0 
> (MapPartitionsRDD[2] at run at ThreadPoolExecutor.java:1149), which has no 
> missing parents
> 2019-02-22 06:11:49,004 [dag-scheduler-event-loop] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_1 stored as 
> values in memory (estimated size 11.7 KB, free 365.8 MB)
> 2019-02-22 06:11:49,024 [main] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 362.596124 ms
> 2019-02-22 06:11:49,054 [dag-scheduler-event-loop] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_1_piece0 
> stored as bytes in memory (estimated size 5.3 KB, free 365.7 MB)
> 2019-02-22 06:11:49,055 [dispatcher-event-loop-1] INFO  
> org.apache.spark.storage.BlockManagerInfo  - Added broadcast_1_piece0 in 
> memory on ip-10-4-3-123.eu-central-1.compute.internal:42935 (size: 5.3 KB, 
> free: 366.2 MB)
> 2019-02-22 06:11:49,066 [dag-scheduler-event-loop] INFO  
> org.apache.spark.SparkContext  - Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1079
> 2019-02-22 06:11:49,101 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Submitting 1 missing tasks from 
> ResultStage 0 (MapPartitionsRDD[2] at run at ThreadPoolExecutor.java:1149) 
> (first 15 tasks are for partitions Vector(0))
> 2019-02-22 06:11:49,103 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.cluster.YarnScheduler  - Adding task set 0.0 with 
> 1 tasks
> 2019-02-22 06:11:49,116 [main] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 56.99095 ms
> 2019-02-22 06:11:49,188 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.FairSchedulableBuilder  - Added task set 
> TaskSet_0.0 tasks to pool default
> 2019-02-22 06:12:04,190 [Timer-0] WARN  
> org.apache.spark.scheduler.cluster.YarnScheduler  - Initial 

Re: Spark dynamic allocation with special executor configuration

2019-02-25 Thread Abdeali Kothari
Yes, it will.
In general: Spark should spawn as many executors as it can to eat up all
the resources on a node.


On Tue, Feb 26, 2019, 11:59 Anton Puzanov  Hello everyone,
>
> Spark has a dynamic resource allocation scheme, where, when available
> Spark manager will automatically add executors to the application resource.
>
> Spark's default configuration is for executors to allocate the entire
> worker node they are running on, but this is configurable, my question is,
> if an executor is set to use half of the worker node. Is it possible that
> Spark will spawn two executors which belong to the same application on the
> same worker node?
>
> Thanks,
> Anton.
>


Identifying cause of exception in PySpark

2018-12-09 Thread Abdeali Kothari
I seem to have an issue in Spark where I create a spark worker process and
listen for jobs from another machine. After about 24 hours and ~3000 jobs,
some jobs in my spark worker just hang indefinitely.

I am trying to set a timeout for my tasks so that the spark session can be
stopped and re-started if a job is taking more than 1 hour or so.

To do this, I send a signal and raise an exception - similar to: library
https://github.com/pnpnpn/timeout-decorator
And it seems to work well in normal python, but not in PySpark.

When the timeout signal is sent in pySpark, py4j seems to catch it and
throws a py4j.Py4jError - and hence i cannot figure out if the error was
caused by a timeout or something else.

I am wondering how I can figure out what caused the original exception in
Pyspark. here is some example code to throw a similar error, and I am
unable to figure out in my `except` whether it was caused by MyExc or
something else:

import pyspark
from pyspark.sql import functions as F
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
[['a1', 'b1', 'c1', 1],
 ['a2', 'b2', 'c2', 2],
 ['a3', 'b3', 'c3', 3]],
['a', 'b', 'c', 'x'])

class MyExc(Exception):
pass

@pyspark.sql.functions.udf
def myudf(x):
raise MyExc("my exception")
return x

df = df.withColumn("x2", myudf(df['x']))
try:
df.show()
except Exception as err:
print("Got err", type(err), err)
# import ipdb; ipdb.set_trace()
raise


PicklingError - Can't pickle py4j.protocol.Py4JJavaError - it's not the same object

2018-12-02 Thread Abdeali Kothari
I am using spark + celery to run some spark scripts async from the rest of
my code.
When any of my celery tasks get an error and throw a python Exception, the
celery on_error() is called and I can handle exceptions easily by logging
the exception.

Seems like the only exception that fails to work is Py4JJavaErrors thrown
by spark.
When my code generates a py4jJavaError, i get an exception in the error
handling of celery. It says the error could not be unpickled right because
they are two different entities.

I'm looking for clues as to what could cause it. I am able to import
py4j.protocol.Py4jJavaError and do debugs.
I went into pyspark/sql/utils.py:capture_sql_exception() which is where my
Py4jJavaError is being thrown and found:
*py4j.__file__ *=
/usr/local/hadoop/spark2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/__init__.py
*id(py4j.protocol.Py4JJavaError) *= 140436967669656

I also went to where the unpickling exception was occurring inside billiard
codebase and found:
*py4j.__file__* =
/usr/local/hadoop/spark2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/__init__.py
*id(py4j.protocol.Py4JJavaError)* =140436967669656

I'm confused as to why an error like this can come up if the id() from
python for both these types are the exact same. and also the file that is
loading them is the same.
I was originally under the impression that there were multiple versions of
py4j conflicting with each other but that does not seem to be the case.

Any thoughts on this would be helpful! Thanks

---

Here is the exact error I get during the exception handling:

2018-12-02 18:11:41,403: ERROR/MainProcess] Task handler raised error:
, None)"'. Reason: ''PicklingError("Can\'t pickle
: it\'s not the same object as
py4j.protocol.Py4JJavaError",)''.>
Traceback (most recent call last):
  File "venv/lib/python3.6/site-packages/billiard/pool.py", line 363, in
workloop
put((READY, (job, i, result, inqW_fd)))
  File "venv/lib/python3.6/site-packages/billiard/queues.py", line 366, in
put
self.send_payload(ForkingPickler.dumps(obj))
  File "venv/lib/python3.6/site-packages/billiard/reduction.py", line 61,
in dumps
cls(buf, protocol).dump(obj)
billiard.pool.MaybeEncodingError: Error sending result: '"(1,
, None)"'. Reason:
''PicklingError("Can\'t pickle :
it\'s not the same object as py4j.protocol.Py4JJavaError",)''.


Re: Show function name in Logs for PythonUDFRunner

2018-11-22 Thread Abdeali Kothari
My understanding is that the log is printed by PythonRunner.scala in the
spark code base. May be mistaken

On Thu, Nov 22, 2018, 17:54 Eike von Seggern  Hi,
>
> Abdeali Kothari  schrieb am Do., 22. Nov. 2018
> um 10:04 Uhr:
>
>> When I run Python UDFs with pyspark, I get multiple logs where it says:
>>
>> 18/11/22 01:51:59 INFO python.PythonUDFRunner: Times: total = 44, boot = 
>> -25, init = 67, finish = 2
>>
>>
>> I am wondering if in these logs I can identify easily which of my
>> PythonUDFs this timing information is for (I have about a hundred) so it's
>> quite difficult for em to identify this easily ...
>>
>
> If the log is created using Python's logging module, it should be
> possible. This supports `funcName` (
> https://docs.python.org/3.6/library/logging.html#logrecord-attributes).
> But I do not know how to configure the log-format for pyspark.
>
>  HTH
>
> Eike
>


Show function name in Logs for PythonUDFRunner

2018-11-22 Thread Abdeali Kothari
When I run Python UDFs with pyspark, I get multiple logs where it says:

18/11/22 01:51:59 INFO python.PythonUDFRunner: Times: total = 44, boot
= -25, init = 67, finish = 2


I am wondering if in these logs I can identify easily which of my
PythonUDFs this timing information is for (I have about a hundred) so it's
quite difficult for em to identify this easily ...


Re: Repartition not working on a csv file

2018-07-01 Thread Abdeali Kothari
I prefer not to do a .cache() due to memory limits. But I did try a
persist() with DISK_ONLY

I did the repartition(), followed by a .count() followed by a persist() of
DISK_ONLY
That didn't change the number of tasks either



On Sun, Jul 1, 2018, 15:50 Alexander Czech 
wrote:

> You could try to force a repartion right at that point by producing a
> cached version of the DF with .cache() if memory allows it.
>
> On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari 
> wrote:
>
>> I've tried that too - it doesn't work. It does a repetition, but not
>> right after the broadcast join - it does a lot more processing and does the
>> repetition right before I do my next sortmerge join (stage 12 I described
>> above)
>> As the heavy processing is before the sort merge join, it still doesn't
>> help
>>
>> On Sun, Jul 1, 2018, 08:30 yujhe.li  wrote:
>>
>>> Abdeali Kothari wrote
>>> > My entire CSV is less than 20KB.
>>> > By somewhere in between, I do a broadcast join with 3500 records in
>>> > another
>>> > file.
>>> > After the broadcast join I have a lot of processing to do. Overall, the
>>> > time to process a single record goes up-to 5mins on 1 executor
>>> >
>>> > I'm trying to increase the partitions that my data is in so that I
>>> have at
>>> > maximum 1 record per executor (currently it sets 2 tasks, and hence 2
>>> > executors... I want it to split it into at least 100 tasks at a time
>>> so I
>>> > get 5 records per task => ~20min per task)
>>>
>>> Maybe you can try repartition(100) after broadcast join, the task number
>>> should change to 100 for your later transformation.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>


Re: Repartition not working on a csv file

2018-06-30 Thread Abdeali Kothari
I've tried that too - it doesn't work. It does a repetition, but not right
after the broadcast join - it does a lot more processing and does the
repetition right before I do my next sortmerge join (stage 12 I described
above)
As the heavy processing is before the sort merge join, it still doesn't help

On Sun, Jul 1, 2018, 08:30 yujhe.li  wrote:

> Abdeali Kothari wrote
> > My entire CSV is less than 20KB.
> > By somewhere in between, I do a broadcast join with 3500 records in
> > another
> > file.
> > After the broadcast join I have a lot of processing to do. Overall, the
> > time to process a single record goes up-to 5mins on 1 executor
> >
> > I'm trying to increase the partitions that my data is in so that I have
> at
> > maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> > executors... I want it to split it into at least 100 tasks at a time so I
> > get 5 records per task => ~20min per task)
>
> Maybe you can try repartition(100) after broadcast join, the task number
> should change to 100 for your later transformation.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Repartition not working on a csv file

2018-06-30 Thread Abdeali Kothari
My entire CSV is less than 20KB.
By somewhere in between, I do a broadcast join with 3500 records in another
file.
After the broadcast join I have a lot of processing to do. Overall, the
time to process a single record goes up-to 5mins on 1 executor

I'm trying to increase the partitions that my data is in so that I have at
maximum 1 record per executor (currently it sets 2 tasks, and hence 2
executors... I want it to split it into at least 100 tasks at a time so I
get 5 records per task => ~20min per task)


On Sun, Jul 1, 2018, 07:58 yujhe.li  wrote:

> Abdeali Kothari wrote
> > I am using Spark 2.3.0 and trying to read a CSV file which has 500
> > records.
> > When I try to read it, spark says that it has two stages: 10, 11 and then
> > they join into stage 12.
>
> What's your CSV size per file? I think Spark optimizer may put many files
> into one task when reading small files.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Repartition not working on a csv file

2018-06-18 Thread Abdeali Kothari
I am using Spark 2.3.0 and trying to read a CSV file which has 500 records.
When I try to read it, spark says that it has two stages: 10, 11 and then
they join into stage 12.

This makes sense and is what I would expect, as I have 30 map-based UDFs
after which i do a join, and run another 10 UDFs and then save the file as
parquet.

The stages 10 and 11 have only 2 tasks according to spark. I have a
max-executors possible of 20 on my cluster. I would like Spark to use all
20 executors for this task.

*1csv+Repartition*: Right after reading the file, if I do a repartition, it
still takes *2 tasks*
*1csv+Repartition+count()*: Right after reading the file, if I do a
repartition and then do an action word like count(), it still takes *2
tasks*
*50csv*: If I split my 500line csv into 50 files with 10 lines each, it
takes *18 tasks*
*50csv+Repartition*: If I split my 500line csv into 50 files with 10 lines
each, and do a repartition and a count, it takes *19 tasks*
*500csv+Repartition*: If I split my 500line csv into 500 files with 1 line
each, and do a repartition and a count, it takes *19 tasks*

All repartitions above are: .repartition(200)

I can't understand what it's trying to do.
I was expecting that if I do a .repartition(200) it would just create 200
tasks after shuffling the data. But it's not doing that.
I can recollect this worked find on Spark 1.6.x.

PS: The reason I want more tasks is because those UDFs are very heavy and
slow - I'd like to use more executors to reduce computation time. I'm sure
they are parallelizable ...