Re: Profiling PySpark Pandas UDF

2022-08-26 Thread Abdeali Kothari
ing”, there is some work in progress on > instrumenting Python UDFs with Spark metrics, see > https://issues.apache.org/jira/browse/SPARK-34265 > > However it is a bit stuck at the moment, and needs to be revived I > believe. > > > > Best, > > Luca > > > > *Fro

Re: Profiling PySpark Pandas UDF

2022-08-25 Thread Abdeali Kothari
The python profiler is pretty cool ! Ill try it out to see what could be taking time within the UDF with it. I'm wondering if there is also some lightweight profiling (which does not slow down my processing) for me to get: - how much time the UDF took (like how much time was spent inside the

Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Abdeali Kothari
y because the upgraded > aqe. > > not sure whether this is expected though. > > On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari > wrote: > >> Just thought I'd do a quick bump and add the dev mailing list - in case >> there is some insight there >> Feels like th

Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-05 Thread Abdeali Kothari
Just thought I'd do a quick bump and add the dev mailing list - in case there is some insight there Feels like this should be categorized as a bug for spark 3.2.0 On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari wrote: > Hi, > I am using pyspark for some projects. And one of the thi

Spark 3.2 - ReusedExchange not present in join execution plan

2021-12-29 Thread Abdeali Kothari
Hi, I am using pyspark for some projects. And one of the things we are doing is trying to find the tables/columns being used by Spark using the execution plan. When we upgrade to spark 3.2 - the spark plan seems to be different from previous versions - mainly when we are doing joins. Below is a

Re: Collecting list of errors across executors

2021-08-03 Thread Abdeali Kothari
You could create a custom accumulator using a linkedlist or so. Some examples that could help: https://towardsdatascience.com/custom-pyspark-accumulators-310f63ca3c8c https://stackoverflow.com/questions/34798578/how-to-create-custom-list-accumulator-i-e-listint-int On Tue, Aug 3, 2021 at 1:23

Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Abdeali Kothari
;> jupyter from org git repo as it was shared, so i do not know how the venv >> was created or python for venv was created even. >> >> The os is CentOS release 6.9 (Final) >> >> >> >> >> >> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/

Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Abdeali Kothari
import base64 >>>> import zlib >>>> >>>> def decompress(data): >>>> >>>> bytecode = base64.b64decode(data) >>>> d = zlib.decompressobj(32 + zlib.MAX_WBITS) >>>> decompressed_data = d.decompress(bytecode

Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Abdeali Kothari
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too. Compare the env variables to check that no additional env configuration is present in either environment. Also is the python environment for both the exact

Re: Usage of PyArrow in Spark

2019-07-18 Thread Abdeali Kothari
ef pandas_func(col): > return a.apply(lambda col: col) > > If we only need some minimised change, I would be positive about adding > Arrow support into regular Python UDFs. Otherwise, I am not sure yet. > > > 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이 작성: > &g

Usage of PyArrow in Spark

2019-07-16 Thread Abdeali Kothari
Hi, In spark 2.3+ I saw that pyarrow was being used in a bunch of places in spark. And I was trying to understand the benefit in terms of serialization / deserializaiton it provides. I understand that the new pandas-udf works only if pyarrow is installed. But what about the plain old PythonUDF

Re: [pyspark 2.3+] CountDistinct

2019-07-01 Thread Abdeali Kothari
--> 450076 > df.filter(col('col1').isNull()).count() --> 0 > df.filter(col('col1').isNotNull()).count() --> 450063 > > col1 is a string > Spark version 2.4.0 > datasize: ~ 500GB > > > On Sat, Jun 29, 2019 at 5:33 AM Abdeali Kothari > wrote: > >> How l

Re: [pyspark 2.3+] CountDistinct

2019-06-29 Thread Abdeali Kothari
How large is the data frame and what data type are you counting distinct for? I use count distinct quite a bit and haven't noticed any thing peculiar. Also, which exact version in 2.3.x? And, are performing any operations on the DF before the countDistinct? I recall there was a bug when I did

Re: [spark on yarn] spark on yarn without DFS

2019-05-19 Thread Abdeali Kothari
While spark can read from S3 directly in EMR, I believe it still needs the HDFS to perform shuffles and to write intermediate data into disk when doing jobs (I.e. when the in memory need stop spill over to disk) For these operations, Spark does need a distributed file system - You could use

Re: spark-sklearn

2019-04-08 Thread Abdeali Kothari
I haven't used spark-sklearn much, but their travis file gives the combination they test with: https://github.com/databricks/spark-sklearn/blob/master/.travis.yml#L8 Also, your first email is a bit confusing - you mentioned Spark 2.2.3 but the traceback path says spark-2.4.1-bin-hadoop2.6 I then

Re: Qn about decision tree apache spark java

2019-04-04 Thread Abdeali Kothari
The Datasets is in a fairly popular data format called libsvm data format - popularized by the libsvm library. http://svmlight.joachims.org - The 'How to Use' section describes the file format. XGBoost uses the same file format and their documentation is here -

Re: pickling a udf

2019-04-04 Thread Abdeali Kothari
The syntax looks right. Are you still getting the error when you open a new python session and run this same code ? Are you running on your laptop with spark local mode or are you running this on a yarn based cluster ? It does seem like something in your python session isnt getting serialized

Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Abdeali Kothari
gt; > Thanks again. > > On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari > wrote: > >> So, the above code for min() worked for me fine in general, but there was >> one corner case where it failed. >> Which was when I have something like: >> invoice_id=1, update_ti

Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Abdeali Kothari
sues. > > Sorry I didn't get quickly. :) > > On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari > wrote: > >> I've faced this issue too - and a colleague pointed me to the >> documentation - >> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.

Re: dropDuplicate on timestamp based column unexpected output

2019-04-03 Thread Abdeali Kothari
I've faced this issue too - and a colleague pointed me to the documentation - https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates dropDuplicates docs does not say that it will guarantee that it will return the "first" record (even if you sort your

Re: Occasional broadcast timeout when dynamic allocation is on

2019-02-26 Thread Abdeali Kothari
I've been facing this issue for the past few months too. I always thought it was an infrastructure issue, but we were never able to figure out what the infra issue was. If others are facing this issue too - then maybe it's a valid bug. Does anyone have any ideas on how we can debug this? On

Re: Spark dynamic allocation with special executor configuration

2019-02-25 Thread Abdeali Kothari
Yes, it will. In general: Spark should spawn as many executors as it can to eat up all the resources on a node. On Tue, Feb 26, 2019, 11:59 Anton Puzanov Hello everyone, > > Spark has a dynamic resource allocation scheme, where, when available > Spark manager will automatically add executors to

Identifying cause of exception in PySpark

2018-12-09 Thread Abdeali Kothari
I seem to have an issue in Spark where I create a spark worker process and listen for jobs from another machine. After about 24 hours and ~3000 jobs, some jobs in my spark worker just hang indefinitely. I am trying to set a timeout for my tasks so that the spark session can be stopped and

PicklingError - Can't pickle py4j.protocol.Py4JJavaError - it's not the same object

2018-12-02 Thread Abdeali Kothari
I am using spark + celery to run some spark scripts async from the rest of my code. When any of my celery tasks get an error and throw a python Exception, the celery on_error() is called and I can handle exceptions easily by logging the exception. Seems like the only exception that fails to work

Re: Show function name in Logs for PythonUDFRunner

2018-11-22 Thread Abdeali Kothari
My understanding is that the log is printed by PythonRunner.scala in the spark code base. May be mistaken On Thu, Nov 22, 2018, 17:54 Eike von Seggern Hi, > > Abdeali Kothari schrieb am Do., 22. Nov. 2018 > um 10:04 Uhr: > >> When I run Python UDFs with pyspark, I get m

Show function name in Logs for PythonUDFRunner

2018-11-22 Thread Abdeali Kothari
When I run Python UDFs with pyspark, I get multiple logs where it says: 18/11/22 01:51:59 INFO python.PythonUDFRunner: Times: total = 44, boot = -25, init = 67, finish = 2 I am wondering if in these logs I can identify easily which of my PythonUDFs this timing information is for (I have about a

Re: Repartition not working on a csv file

2018-07-01 Thread Abdeali Kothari
try to force a repartion right at that point by producing a > cached version of the DF with .cache() if memory allows it. > > On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari > wrote: > >> I've tried that too - it doesn't work. It does a repetition, but not >> right afte

Re: Repartition not working on a csv file

2018-06-30 Thread Abdeali Kothari
help On Sun, Jul 1, 2018, 08:30 yujhe.li wrote: > Abdeali Kothari wrote > > My entire CSV is less than 20KB. > > By somewhere in between, I do a broadcast join with 3500 records in > > another > > file. > > After the broadcast join I have a lot of proces

Re: Repartition not working on a csv file

2018-06-30 Thread Abdeali Kothari
data is in so that I have at maximum 1 record per executor (currently it sets 2 tasks, and hence 2 executors... I want it to split it into at least 100 tasks at a time so I get 5 records per task => ~20min per task) On Sun, Jul 1, 2018, 07:58 yujhe.li wrote: > Abdeali Kothari wrote > >

Repartition not working on a csv file

2018-06-18 Thread Abdeali Kothari
I am using Spark 2.3.0 and trying to read a CSV file which has 500 records. When I try to read it, spark says that it has two stages: 10, 11 and then they join into stage 12. This makes sense and is what I would expect, as I have 30 map-based UDFs after which i do a join, and run another 10 UDFs