Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Dhrubajyoti Hati
Just checked from where the script is submitted i.e. wrt Driver, the python
env are different. Jupyter one is running within a the virtual environment
which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the
executors have the same python version right? I tried doing a spark-submit
from jupyter shell, it fails to find python 2.7  which is not there hence
throws error.

Here is the udf which might take time:

import base64
import zlib

def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))


Could this because of the two python environment mismatch from Driver
side? But the processing

happens in the executor side?




*Regards,Dhrub*

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari 
wrote:

> Maybe you can try running it in a python shell or jupyter-console/ipython
> instead of a spark-submit and check how much time it takes too.
>
> Compare the env variables to check that no additional env configuration is
> present in either environment.
>
> Also is the python environment for both the exact same? I ask because it
> looks like you're using a UDF and if the Jupyter python has (let's say)
> numpy compiled with blas it would be faster than a numpy without it. Etc.
> I.E. Some library you use may be using pure python and another may be using
> a faster C extension...
>
> What python libraries are you using in the UDFs? It you don't use UDFs at
> all and use some very simple pure spark functions does the time difference
> still exist?
>
> Also are you using dynamic allocation or some similar spark config which
> could vary performance between runs because the same resources we're not
> utilized on Jupyter / spark-submit?
>
>
> On Wed, Sep 11, 2019, 08:43 Stephen Boesch  wrote:
>
>> Sounds like you have done your homework to properly compare .   I'm
>> guessing the answer to the following is yes .. but in any case:  are they
>> both running against the same spark cluster with the same configuration
>> parameters especially executor memory and number of workers?
>>
>> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
>> dhruba.w...@gmail.com>:
>>
>>> No, i checked for that, hence written "brand new" jupyter notebook. Also
>>> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
>>> compressed base64 encoded text data from a hive table and decompressing and
>>> decoding in one of the udfs. Also the time compared is from Spark UI not
>>> how long the job actually takes after submission. Its just the running time
>>> i am comparing/mentioning.
>>>
>>> As mentioned earlier, all the spark conf params even match in two
>>> scripts and that's why i am puzzled what going on.
>>>
>>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <
>>> pmccar...@dstillery.com> wrote:
>>>
 It's not obvious from what you pasted, but perhaps the juypter notebook
 already is connected to a running spark context, while spark-submit needs
 to get a new spot in the (YARN?) queue.

 I would check the cluster job IDs for both to ensure you're getting new
 cluster tasks for each.

 On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
 wrote:

> Hi,
>
> I am facing a weird behaviour while running a python script. Here is
> what the code looks like mostly:
>
> def fn1(ip):
>some code...
> ...
>
> def fn2(row):
> ...
> some operations
> ...
> return row1
>
>
> udf_fn1 = udf(fn1)
> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
> ~4500 partitions
> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
> .drop("colz") \
> .withColumnRenamed("colz", "coly")
>
> edf = ddf \
> .filter(ddf.colp == 'some_value') \
> .rdd.map(lambda row: fn2(row)) \
> .toDF()
>
> print edf.count() // simple way for the performance test in both
> platforms
>
> Now when I run the same code in a brand new jupyter notebook it runs
> 6x faster than when I run this python script using spark-submit. The
> configurations are printed and  compared from both the platforms and they
> are exact same. I even tried to run this script in a single cell of 
> jupyter
> notebook and still have the same performance. I need to understand if I am
> missing something in the spark-submit which is causing the issue.  I tried
> to minimise the script to reproduce the same error without much code.
>
> Both are run in client mode on a yarn based spark cluster. The
> machines from which both are executed are also the same and from same 
> user.
>
> What i found is the  the quantile values for median for one ran with
> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am 
> not
> able to figure out why this 

Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Abdeali Kothari
Maybe you can try running it in a python shell or jupyter-console/ipython
instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is
present in either environment.

Also is the python environment for both the exact same? I ask because it
looks like you're using a UDF and if the Jupyter python has (let's say)
numpy compiled with blas it would be faster than a numpy without it. Etc.
I.E. Some library you use may be using pure python and another may be using
a faster C extension...

What python libraries are you using in the UDFs? It you don't use UDFs at
all and use some very simple pure spark functions does the time difference
still exist?

Also are you using dynamic allocation or some similar spark config which
could vary performance between runs because the same resources we're not
utilized on Jupyter / spark-submit?


On Wed, Sep 11, 2019, 08:43 Stephen Boesch  wrote:

> Sounds like you have done your homework to properly compare .   I'm
> guessing the answer to the following is yes .. but in any case:  are they
> both running against the same spark cluster with the same configuration
> parameters especially executor memory and number of workers?
>
> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
> dhruba.w...@gmail.com>:
>
>> No, i checked for that, hence written "brand new" jupyter notebook. Also
>> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
>> compressed base64 encoded text data from a hive table and decompressing and
>> decoding in one of the udfs. Also the time compared is from Spark UI not
>> how long the job actually takes after submission. Its just the running time
>> i am comparing/mentioning.
>>
>> As mentioned earlier, all the spark conf params even match in two scripts
>> and that's why i am puzzled what going on.
>>
>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, 
>> wrote:
>>
>>> It's not obvious from what you pasted, but perhaps the juypter notebook
>>> already is connected to a running spark context, while spark-submit needs
>>> to get a new spot in the (YARN?) queue.
>>>
>>> I would check the cluster job IDs for both to ensure you're getting new
>>> cluster tasks for each.
>>>
>>> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
>>> wrote:
>>>
 Hi,

 I am facing a weird behaviour while running a python script. Here is
 what the code looks like mostly:

 def fn1(ip):
some code...
 ...

 def fn2(row):
 ...
 some operations
 ...
 return row1


 udf_fn1 = udf(fn1)
 cdf = spark.read.table("") //hive table is of size > 500 Gigs with
 ~4500 partitions
 ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
 .drop("colz") \
 .withColumnRenamed("colz", "coly")

 edf = ddf \
 .filter(ddf.colp == 'some_value') \
 .rdd.map(lambda row: fn2(row)) \
 .toDF()

 print edf.count() // simple way for the performance test in both
 platforms

 Now when I run the same code in a brand new jupyter notebook it runs 6x
 faster than when I run this python script using spark-submit. The
 configurations are printed and  compared from both the platforms and they
 are exact same. I even tried to run this script in a single cell of jupyter
 notebook and still have the same performance. I need to understand if I am
 missing something in the spark-submit which is causing the issue.  I tried
 to minimise the script to reproduce the same error without much code.

 Both are run in client mode on a yarn based spark cluster. The machines
 from which both are executed are also the same and from same user.

 What i found is the  the quantile values for median for one ran with
 jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
 able to figure out why this is happening.

 Any one faced this kind of issue before or know how to resolve this?

 *Regards,*
 *Dhrub*

>>>
>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Stephen Boesch
Ok. Can't think of why that would happen.

Am Di., 10. Sept. 2019 um 20:26 Uhr schrieb Dhrubajyoti Hati <
dhruba.w...@gmail.com>:

> As mentioned in the very first mail:
> * same cluster it is submitted.
> * from same machine they are submitted and also from same user
> * each of them has 128 executors and 2 cores per executor with 8Gigs of
> memory each and both of them are getting that while running
>
> to clarify more let me quote what I mentioned above. *These data is taken
> from Spark-UI when the jobs are almost finished in both.*
> "What i found is the  the quantile values for median for one ran with
> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins." which
> means per task time taken is much higher in spark-submit script than
> jupyter script. This is where I am really puzzled because they are the
> exact same code. why running them two different ways vary so much in the
> execution time.
>
>
>
>
> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>
>
> On Wed, Sep 11, 2019 at 8:42 AM Stephen Boesch  wrote:
>
>> Sounds like you have done your homework to properly compare .   I'm
>> guessing the answer to the following is yes .. but in any case:  are they
>> both running against the same spark cluster with the same configuration
>> parameters especially executor memory and number of workers?
>>
>> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
>> dhruba.w...@gmail.com>:
>>
>>> No, i checked for that, hence written "brand new" jupyter notebook. Also
>>> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
>>> compressed base64 encoded text data from a hive table and decompressing and
>>> decoding in one of the udfs. Also the time compared is from Spark UI not
>>> how long the job actually takes after submission. Its just the running time
>>> i am comparing/mentioning.
>>>
>>> As mentioned earlier, all the spark conf params even match in two
>>> scripts and that's why i am puzzled what going on.
>>>
>>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <
>>> pmccar...@dstillery.com> wrote:
>>>
 It's not obvious from what you pasted, but perhaps the juypter notebook
 already is connected to a running spark context, while spark-submit needs
 to get a new spot in the (YARN?) queue.

 I would check the cluster job IDs for both to ensure you're getting new
 cluster tasks for each.

 On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
 wrote:

> Hi,
>
> I am facing a weird behaviour while running a python script. Here is
> what the code looks like mostly:
>
> def fn1(ip):
>some code...
> ...
>
> def fn2(row):
> ...
> some operations
> ...
> return row1
>
>
> udf_fn1 = udf(fn1)
> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
> ~4500 partitions
> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
> .drop("colz") \
> .withColumnRenamed("colz", "coly")
>
> edf = ddf \
> .filter(ddf.colp == 'some_value') \
> .rdd.map(lambda row: fn2(row)) \
> .toDF()
>
> print edf.count() // simple way for the performance test in both
> platforms
>
> Now when I run the same code in a brand new jupyter notebook it runs
> 6x faster than when I run this python script using spark-submit. The
> configurations are printed and  compared from both the platforms and they
> are exact same. I even tried to run this script in a single cell of 
> jupyter
> notebook and still have the same performance. I need to understand if I am
> missing something in the spark-submit which is causing the issue.  I tried
> to minimise the script to reproduce the same error without much code.
>
> Both are run in client mode on a yarn based spark cluster. The
> machines from which both are executed are also the same and from same 
> user.
>
> What i found is the  the quantile values for median for one ran with
> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am 
> not
> able to figure out why this is happening.
>
> Any one faced this kind of issue before or know how to resolve this?
>
> *Regards,*
> *Dhrub*
>


 --


 *Patrick McCarthy  *

 Senior Data Scientist, Machine Learning Engineering

 Dstillery

 470 Park Ave South, 17th Floor, NYC 10016

>>>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Dhrubajyoti Hati
As mentioned in the very first mail:
* same cluster it is submitted.
* from same machine they are submitted and also from same user
* each of them has 128 executors and 2 cores per executor with 8Gigs of
memory each and both of them are getting that while running

to clarify more let me quote what I mentioned above. *These data is taken
from Spark-UI when the jobs are almost finished in both.*
"What i found is the  the quantile values for median for one ran with
jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins." which
means per task time taken is much higher in spark-submit script than
jupyter script. This is where I am really puzzled because they are the
exact same code. why running them two different ways vary so much in the
execution time.




*Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*


On Wed, Sep 11, 2019 at 8:42 AM Stephen Boesch  wrote:

> Sounds like you have done your homework to properly compare .   I'm
> guessing the answer to the following is yes .. but in any case:  are they
> both running against the same spark cluster with the same configuration
> parameters especially executor memory and number of workers?
>
> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
> dhruba.w...@gmail.com>:
>
>> No, i checked for that, hence written "brand new" jupyter notebook. Also
>> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
>> compressed base64 encoded text data from a hive table and decompressing and
>> decoding in one of the udfs. Also the time compared is from Spark UI not
>> how long the job actually takes after submission. Its just the running time
>> i am comparing/mentioning.
>>
>> As mentioned earlier, all the spark conf params even match in two scripts
>> and that's why i am puzzled what going on.
>>
>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, 
>> wrote:
>>
>>> It's not obvious from what you pasted, but perhaps the juypter notebook
>>> already is connected to a running spark context, while spark-submit needs
>>> to get a new spot in the (YARN?) queue.
>>>
>>> I would check the cluster job IDs for both to ensure you're getting new
>>> cluster tasks for each.
>>>
>>> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
>>> wrote:
>>>
 Hi,

 I am facing a weird behaviour while running a python script. Here is
 what the code looks like mostly:

 def fn1(ip):
some code...
 ...

 def fn2(row):
 ...
 some operations
 ...
 return row1


 udf_fn1 = udf(fn1)
 cdf = spark.read.table("") //hive table is of size > 500 Gigs with
 ~4500 partitions
 ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
 .drop("colz") \
 .withColumnRenamed("colz", "coly")

 edf = ddf \
 .filter(ddf.colp == 'some_value') \
 .rdd.map(lambda row: fn2(row)) \
 .toDF()

 print edf.count() // simple way for the performance test in both
 platforms

 Now when I run the same code in a brand new jupyter notebook it runs 6x
 faster than when I run this python script using spark-submit. The
 configurations are printed and  compared from both the platforms and they
 are exact same. I even tried to run this script in a single cell of jupyter
 notebook and still have the same performance. I need to understand if I am
 missing something in the spark-submit which is causing the issue.  I tried
 to minimise the script to reproduce the same error without much code.

 Both are run in client mode on a yarn based spark cluster. The machines
 from which both are executed are also the same and from same user.

 What i found is the  the quantile values for median for one ran with
 jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
 able to figure out why this is happening.

 Any one faced this kind of issue before or know how to resolve this?

 *Regards,*
 *Dhrub*

>>>
>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Stephen Boesch
Sounds like you have done your homework to properly compare .   I'm
guessing the answer to the following is yes .. but in any case:  are they
both running against the same spark cluster with the same configuration
parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
dhruba.w...@gmail.com>:

> No, i checked for that, hence written "brand new" jupyter notebook. Also
> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
> compressed base64 encoded text data from a hive table and decompressing and
> decoding in one of the udfs. Also the time compared is from Spark UI not
> how long the job actually takes after submission. Its just the running time
> i am comparing/mentioning.
>
> As mentioned earlier, all the spark conf params even match in two scripts
> and that's why i am puzzled what going on.
>
> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, 
> wrote:
>
>> It's not obvious from what you pasted, but perhaps the juypter notebook
>> already is connected to a running spark context, while spark-submit needs
>> to get a new spot in the (YARN?) queue.
>>
>> I would check the cluster job IDs for both to ensure you're getting new
>> cluster tasks for each.
>>
>> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
>> wrote:
>>
>>> Hi,
>>>
>>> I am facing a weird behaviour while running a python script. Here is
>>> what the code looks like mostly:
>>>
>>> def fn1(ip):
>>>some code...
>>> ...
>>>
>>> def fn2(row):
>>> ...
>>> some operations
>>> ...
>>> return row1
>>>
>>>
>>> udf_fn1 = udf(fn1)
>>> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
>>> ~4500 partitions
>>> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
>>> .drop("colz") \
>>> .withColumnRenamed("colz", "coly")
>>>
>>> edf = ddf \
>>> .filter(ddf.colp == 'some_value') \
>>> .rdd.map(lambda row: fn2(row)) \
>>> .toDF()
>>>
>>> print edf.count() // simple way for the performance test in both
>>> platforms
>>>
>>> Now when I run the same code in a brand new jupyter notebook it runs 6x
>>> faster than when I run this python script using spark-submit. The
>>> configurations are printed and  compared from both the platforms and they
>>> are exact same. I even tried to run this script in a single cell of jupyter
>>> notebook and still have the same performance. I need to understand if I am
>>> missing something in the spark-submit which is causing the issue.  I tried
>>> to minimise the script to reproduce the same error without much code.
>>>
>>> Both are run in client mode on a yarn based spark cluster. The machines
>>> from which both are executed are also the same and from same user.
>>>
>>> What i found is the  the quantile values for median for one ran with
>>> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
>>> able to figure out why this is happening.
>>>
>>> Any one faced this kind of issue before or know how to resolve this?
>>>
>>> *Regards,*
>>> *Dhrub*
>>>
>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Dhrubajyoti Hati
No, i checked for that, hence written "brand new" jupyter notebook. Also
the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
compressed base64 encoded text data from a hive table and decompressing and
decoding in one of the udfs. Also the time compared is from Spark UI not
how long the job actually takes after submission. Its just the running time
i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts
and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, 
wrote:

> It's not obvious from what you pasted, but perhaps the juypter notebook
> already is connected to a running spark context, while spark-submit needs
> to get a new spot in the (YARN?) queue.
>
> I would check the cluster job IDs for both to ensure you're getting new
> cluster tasks for each.
>
> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
> wrote:
>
>> Hi,
>>
>> I am facing a weird behaviour while running a python script. Here is what
>> the code looks like mostly:
>>
>> def fn1(ip):
>>some code...
>> ...
>>
>> def fn2(row):
>> ...
>> some operations
>> ...
>> return row1
>>
>>
>> udf_fn1 = udf(fn1)
>> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
>> ~4500 partitions
>> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
>> .drop("colz") \
>> .withColumnRenamed("colz", "coly")
>>
>> edf = ddf \
>> .filter(ddf.colp == 'some_value') \
>> .rdd.map(lambda row: fn2(row)) \
>> .toDF()
>>
>> print edf.count() // simple way for the performance test in both platforms
>>
>> Now when I run the same code in a brand new jupyter notebook it runs 6x
>> faster than when I run this python script using spark-submit. The
>> configurations are printed and  compared from both the platforms and they
>> are exact same. I even tried to run this script in a single cell of jupyter
>> notebook and still have the same performance. I need to understand if I am
>> missing something in the spark-submit which is causing the issue.  I tried
>> to minimise the script to reproduce the same error without much code.
>>
>> Both are run in client mode on a yarn based spark cluster. The machines
>> from which both are executed are also the same and from same user.
>>
>> What i found is the  the quantile values for median for one ran with
>> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
>> able to figure out why this is happening.
>>
>> Any one faced this kind of issue before or know how to resolve this?
>>
>> *Regards,*
>> *Dhrub*
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Access all of the custom streaming query listeners that were registered to spark session

2019-09-10 Thread Natalie Ruiz
Hello,

Is there a way to access all of the custom listeners that have been registered 
to a spark session? I want to remove the listeners that I am no longer using, 
except I don't know what they were saved as, I just see testing output messages 
on my  streaming query.  I created a stack overflow question here for more 
detail:

https://stackoverflow.com/questions/57876607/how-to-access-all-of-an-apache-sparks-session-streaming-query-custom-listeners

Thanks you,
Natalie


Re: question about pyarrow.Table to pyspark.DataFrame conversion

2019-09-10 Thread Bryan Cutler
Hi Artem,

I don't believe this is currently possible, but it could be a great
addition to PySpark since this would offer a convenient and efficient way
to parallelize nested column data. I created the JIRA
https://issues.apache.org/jira/browse/SPARK-29040 for this.

On Tue, Aug 27, 2019 at 7:55 PM Artem Kozhevnikov <
kozhevnikov.ar...@gmail.com> wrote:

> I wonder if there's some recommended method to convert in memory
> pyarrow.Table (or pyarrow.BatchRecord) to pyspark.Dataframe without using
> pandas ?
> My motivation is about converting nested data (like List[int]) that have
> an efficient representation in pyarrow which is not possible with Pandas (I
> don't want to pass by python list of int ...).
>
> Thanks in advance !
> Artem
>
>
>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Patrick McCarthy
It's not obvious from what you pasted, but perhaps the juypter notebook
already is connected to a running spark context, while spark-submit needs
to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new
cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
wrote:

> Hi,
>
> I am facing a weird behaviour while running a python script. Here is what
> the code looks like mostly:
>
> def fn1(ip):
>some code...
> ...
>
> def fn2(row):
> ...
> some operations
> ...
> return row1
>
>
> udf_fn1 = udf(fn1)
> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
> ~4500 partitions
> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
> .drop("colz") \
> .withColumnRenamed("colz", "coly")
>
> edf = ddf \
> .filter(ddf.colp == 'some_value') \
> .rdd.map(lambda row: fn2(row)) \
> .toDF()
>
> print edf.count() // simple way for the performance test in both platforms
>
> Now when I run the same code in a brand new jupyter notebook it runs 6x
> faster than when I run this python script using spark-submit. The
> configurations are printed and  compared from both the platforms and they
> are exact same. I even tried to run this script in a single cell of jupyter
> notebook and still have the same performance. I need to understand if I am
> missing something in the spark-submit which is causing the issue.  I tried
> to minimise the script to reproduce the same error without much code.
>
> Both are run in client mode on a yarn based spark cluster. The machines
> from which both are executed are also the same and from same user.
>
> What i found is the  the quantile values for median for one ran with
> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
> able to figure out why this is happening.
>
> Any one faced this kind of issue before or know how to resolve this?
>
> *Regards,*
> *Dhrub*
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Dhrubajyoti Hati
Hi,

I am facing a weird behaviour while running a python script. Here is what
the code looks like mostly:

def fn1(ip):
   some code...
...

def fn2(row):
...
some operations
...
return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("") //hive table is of size > 500 Gigs with
~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
.drop("colz") \
.withColumnRenamed("colz", "coly")

edf = ddf \
.filter(ddf.colp == 'some_value') \
.rdd.map(lambda row: fn2(row)) \
.toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x
faster than when I run this python script using spark-submit. The
configurations are printed and  compared from both the platforms and they
are exact same. I even tried to run this script in a single cell of jupyter
notebook and still have the same performance. I need to understand if I am
missing something in the spark-submit which is causing the issue.  I tried
to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines
from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with
jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

*Regards,*
*Dhrub*


Deadlock using Barrier Execution

2019-09-10 Thread csmith
I'm using barrier execution in my spark job but am occasionally seeing
deadlocks where the task scheduler is unable to place all the tasks. The
failure is logged but the job hangs indefinitely. I have 2 executors with 16
cores each, using standalone mode (I think? I'm using databricks). The
dataset has 31 partitions.


One thing I've noticed when this occurs is that the number of "Active Tasks"
exceeds the number of cores on one executor. How is the executor able to
start more tasks than it has cores? spark.executor.cores is not set.


Looking at the source I came across this issue
https://issues.apache.org/jira/browse/SPARK-24818 which looks related but
I'm not sure how to properly address it. 



 


Example log

19/09/10 13:39:55 WARN FairSchedulableBuilder: A job was submitted with
scheduler pool 7987057535266794448, which has not been configured. This can
happen when the file that pools are read from isn't set, or when that file
doesn't contain 7987057535266794448. Created 7987057535266794448 with
default configuration (schedulingMode: FIFO, minShare: 0, weight: 1)
19/09/10 13:39:55 INFO FairSchedulableBuilder: Added task set TaskSet_20.0
tasks to pool 7987057535266794448
19/09/10 13:39:55 INFO TaskSetManager: Starting task 1.0 in stage 20.0 (TID
1705, 10.222.231.128, executor 0, partition 1, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 0.0 in stage 20.0 (TID
1706, 10.222.234.66, executor 1, partition 0, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 5.0 in stage 20.0 (TID
1707, 10.222.231.128, executor 0, partition 5, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 2.0 in stage 20.0 (TID
1708, 10.222.234.66, executor 1, partition 2, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 7.0 in stage 20.0 (TID
1709, 10.222.231.128, executor 0, partition 7, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 3.0 in stage 20.0 (TID
1710, 10.222.234.66, executor 1, partition 3, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 8.0 in stage 20.0 (TID
1711, 10.222.231.128, executor 0, partition 8, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 4.0 in stage 20.0 (TID
1712, 10.222.234.66, executor 1, partition 4, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 9.0 in stage 20.0 (TID
1713, 10.222.231.128, executor 0, partition 9, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 6.0 in stage 20.0 (TID
1714, 10.222.234.66, executor 1, partition 6, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 12.0 in stage 20.0 (TID
1715, 10.222.231.128, executor 0, partition 12, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 10.0 in stage 20.0 (TID
1716, 10.222.234.66, executor 1, partition 10, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 13.0 in stage 20.0 (TID
1717, 10.222.231.128, executor 0, partition 13, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 11.0 in stage 20.0 (TID
1718, 10.222.234.66, executor 1, partition 11, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 16.0 in stage 20.0 (TID
1719, 10.222.231.128, executor 0, partition 16, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 14.0 in stage 20.0 (TID
1720, 10.222.234.66, executor 1, partition 14, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 17.0 in stage 20.0 (TID
1721, 10.222.231.128, executor 0, partition 17, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 15.0 in stage 20.0 (TID
1722, 10.222.234.66, executor 1, partition 15, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 18.0 in stage 20.0 (TID
1723, 10.222.231.128, executor 0, partition 18, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 21.0 in stage 20.0 (TID
1724, 10.222.234.66, executor 1, partition 21, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 19.0 in stage 20.0 (TID
1725, 10.222.231.128, executor 0, partition 19, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 22.0 in stage 20.0 (TID
1726, 10.222.234.66, executor 1, partition 22, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 20.0 in stage 20.0 (TID
1727, 10.222.231.128, executor 0, partition 20, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 24.0 in stage 20.0 (TID
1728, 10.222.234.66, executor 1, partition 24, NODE_LOCAL, 5669 bytes)
19/09/10 13:39:55 INFO TaskSetManager: Starting task 23.0 in stage 20.0 (TID
1729, 10.222.231.128, executor 0, partition 23, NODE_LOCAL, 5669 bytes)
19/09/10 13:

Custom encoders and udf's

2019-09-10 Thread jelmer
Hi,

I am using a org.apache.spark.sql.Encoder to serialize a custom object.

I now want to pass this column to a udf so it can do some operations on it
but this gives me the error :

Caused by: java.lang.ClassCastException: [B cannot be cast to

The code included at the problem demonstrates the issue.

I know I can simply make Person a case class in this example but its for
illustration purposes

Does anyone know how to solve this problem?




import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.functions._
import org.scalatest.FunSuite
import org.scalatest.Matchers._

class Person(val name: String) extends Serializable

class MySpec extends FunSuite with DatasetSuiteBase {

  test("udf test") {

val sqlCtx = sqlContext
import sqlCtx.implicits._

val myUdf = udf { person: Person => person.name }

implicit val personEncoder: Encoder[Person] =
  Encoders.javaSerialization[Person]

implicit val partitionAndPersonEncoder: Encoder[(Int, Person)] =
  Encoders.tuple(Encoders.scalaInt, personEncoder)

val input = sc.parallelize(Seq(
  1 -> new Person("jack"),
  2 -> new Person("jill")
)).toDF("partition", "value")

input.printSchema()

input.select(myUdf($"value"))show()
  }

}