You could create a custom accumulator using a linkedlist or so.
Some examples that could help:
https://towardsdatascience.com/custom-pyspark-accumulators-310f63ca3c8c
https://stackoverflow.com/questions/34798578/how-to-create-custom-list-accumulator-i-e-listint-int
On Tue, Aug 3, 2021 at 1:23 PM
Hi,
I am using pyspark for some projects. And one of the things we are doing is
trying to find the tables/columns being used by Spark using the execution
plan.
When we upgrade to spark 3.2 - the spark plan seems to be different from
previous versions - mainly when we are doing joins.
Below is a re
Just thought I'd do a quick bump and add the dev mailing list - in case
there is some insight there
Feels like this should be categorized as a bug for spark 3.2.0
On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari
wrote:
> Hi,
> I am using pyspark for some projects. And one of the th
.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>>
The python profiler is pretty cool !
Ill try it out to see what could be taking time within the UDF with it.
I'm wondering if there is also some lightweight profiling (which does not
slow down my processing) for me to get:
- how much time the UDF took (like how much time was spent inside the UDF
ng”, there is some work in progress on
> instrumenting Python UDFs with Spark metrics, see
> https://issues.apache.org/jira/browse/SPARK-34265
>
> However it is a bit stuck at the moment, and needs to be revived I
> believe.
>
>
>
> Best,
>
> Luca
>
>
>
I am using Spark 2.3.0 and trying to read a CSV file which has 500 records.
When I try to read it, spark says that it has two stages: 10, 11 and then
they join into stage 12.
This makes sense and is what I would expect, as I have 30 map-based UDFs
after which i do a join, and run another 10 UDFs a
at my data is in so that I have at
maximum 1 record per executor (currently it sets 2 tasks, and hence 2
executors... I want it to split it into at least 100 tasks at a time so I
get 5 records per task => ~20min per task)
On Sun, Jul 1, 2018, 07:58 yujhe.li wrote:
> Abdeali Kothari wrote
>
ll doesn't help
On Sun, Jul 1, 2018, 08:30 yujhe.li wrote:
> Abdeali Kothari wrote
> > My entire CSV is less than 20KB.
> > By somewhere in between, I do a broadcast join with 3500 records in
> > another
> > file.
> > After the broadcast join I have a l
d try to force a repartion right at that point by producing a
> cached version of the DF with .cache() if memory allows it.
>
> On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari
> wrote:
>
>> I've tried that too - it doesn't work. It does a repetition, but not
>&g
When I run Python UDFs with pyspark, I get multiple logs where it says:
18/11/22 01:51:59 INFO python.PythonUDFRunner: Times: total = 44, boot
= -25, init = 67, finish = 2
I am wondering if in these logs I can identify easily which of my
PythonUDFs this timing information is for (I have about a
My understanding is that the log is printed by PythonRunner.scala in the
spark code base. May be mistaken
On Thu, Nov 22, 2018, 17:54 Eike von Seggern Hi,
>
> Abdeali Kothari schrieb am Do., 22. Nov. 2018
> um 10:04 Uhr:
>
>> When I run Python UDFs with pyspark, I get mult
I am using spark + celery to run some spark scripts async from the rest of
my code.
When any of my celery tasks get an error and throw a python Exception, the
celery on_error() is called and I can handle exceptions easily by logging
the exception.
Seems like the only exception that fails to work i
I seem to have an issue in Spark where I create a spark worker process and
listen for jobs from another machine. After about 24 hours and ~3000 jobs,
some jobs in my spark worker just hang indefinitely.
I am trying to set a timeout for my tasks so that the spark session can be
stopped and re-start
Yes, it will.
In general: Spark should spawn as many executors as it can to eat up all
the resources on a node.
On Tue, Feb 26, 2019, 11:59 Anton Puzanov Hello everyone,
>
> Spark has a dynamic resource allocation scheme, where, when available
> Spark manager will automatically add executors to
I've been facing this issue for the past few months too.
I always thought it was an infrastructure issue, but we were never able to
figure out what the infra issue was.
If others are facing this issue too - then maybe it's a valid bug.
Does anyone have any ideas on how we can debug this?
On Fri,
I've faced this issue too - and a colleague pointed me to the documentation
-
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
dropDuplicates docs does not say that it will guarantee that it will return
the "first" record (even if you sort your da
#x27;t cause
> issues.
>
> Sorry I didn't get quickly. :)
>
> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari
> wrote:
>
>> I've faced this issue too - and a colleague pointed me to the
>> documentation -
>> https://spark.apache.org/docs/2.4.0/api/py
me.
>
> Thanks again.
>
> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari
> wrote:
>
>> So, the above code for min() worked for me fine in general, but there was
>> one corner case where it failed.
>> Which was when I have something like:
>> invoice_id=1, upda
The syntax looks right.
Are you still getting the error when you open a new python session and run
this same code ?
Are you running on your laptop with spark local mode or are you running
this on a yarn based cluster ?
It does seem like something in your python session isnt getting serialized
right
The Datasets is in a fairly popular data format called libsvm data format -
popularized by the libsvm library.
http://svmlight.joachims.org - The 'How to Use' section describes the file
format.
XGBoost uses the same file format and their documentation is here -
https://xgboost.readthedocs.io/en/l
I haven't used spark-sklearn much, but their travis file gives the
combination they test with:
https://github.com/databricks/spark-sklearn/blob/master/.travis.yml#L8
Also, your first email is a bit confusing - you mentioned Spark 2.2.3 but
the traceback path says spark-2.4.1-bin-hadoop2.6
I then t
While spark can read from S3 directly in EMR, I believe it still needs the
HDFS to perform shuffles and to write intermediate data into disk when
doing jobs (I.e. when the in memory need stop spill over to disk)
For these operations, Spark does need a distributed file system - You could
use someth
How large is the data frame and what data type are you counting distinct
for?
I use count distinct quite a bit and haven't noticed any thing peculiar.
Also, which exact version in 2.3.x?
And, are performing any operations on the DF before the countDistinct?
I recall there was a bug when I did cou
; df.agg(countDistinct(col('col1'))).show() --> 450089
> df.agg(countDistinct(col('col1'))).show() --> 450076
> df.filter(col('col1').isNull()).count() --> 0
> df.filter(col('col1').isNotNull()).count() --> 450063
>
> col1 is a strin
Hi,
In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
spark. And I was trying to understand the benefit in terms of serialization
/ deserializaiton it provides.
I understand that the new pandas-udf works only if pyarrow is installed.
But what about the plain old PythonUDF whi
w are virtually identical.
>
> @udf(...)
> def func(col):
> return col
>
> @pandas_udf(...)
> def pandas_func(col):
> return a.apply(lambda col: col)
>
> If we only need some minimised change, I would be positive about adding
> Arrow support into re
Maybe you can try running it in a python shell or jupyter-console/ipython
instead of a spark-submit and check how much time it takes too.
Compare the env variables to check that no additional env configuration is
present in either environment.
Also is the python environment for both the exact sam
import base64
>>>> import zlib
>>>>
>>>> def decompress(data):
>>>>
>>>> bytecode = base64.b64decode(data)
>>>> d = zlib.decompressobj(32 + zlib.MAX_WBITS)
>>>> decompressed_data = d.decompress(bytecode
gt;> jupyter from org git repo as it was shared, so i do not know how the venv
>> was created or python for venv was created even.
>>
>> The os is CentOS release 6.9 (Final)
>>
>>
>>
>>
>>
>> *Regards,Dhrubajyoti Hati.Mob No: 9886428028
30 matches
Mail list logo