Re: A Persisted Spark DataFrame is computed twice

2022-02-01 Thread Gourav Sengupta
 .persist(StorageLevel.MEMORY_ONLY) \
> .checkpoint() \
> .coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>1. persist (to memory) + checkpoint + without coalesce computes the
>DataFrame twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.MEMORY_ONLY) \
> .checkpoint() \
> .write.mode("overwrite").parquet(output_mod)
>
>
>1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce
>computes it twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .cache()
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> A
>
> Manual output compute it only once. The function repart_hdfs below is a
> function written by myself to write a DataFrame to disk, read it back,
> repartition/coalesce it, and then write it back to HDFS.
>
> spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .write.mode("overwrite").parquet(output_mod)
> repart_hdfs(spark, output_mod, 300, coalesce=True)
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
> ------
> *From:* Sebastian Piu 
> *Sent:* Sunday, January 30, 2022 12:44 AM
> *To:* Benjamin Du 
> *Cc:* u...@spark.incubator.apache.org 
> *Subject:* Re: A Persisted Spark DataFrame is computed twice
>
> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du,  wrote:
>
> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(...) \
> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>
>


Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Sean Owen
One guess - you are doing two things here, count() and write(). There is a
persist(), but it's async. It won't necessarily wait for the persist to
finish before proceeding and may have to recompute at least some partitions
for the second op. You could debug further by looking at the stages and
seeing what exactly is executing and where it uses cached partitions or not.

On Mon, Jan 31, 2022 at 2:12 AM Benjamin Du  wrote:

> I did check the execution plan, there were 2 stages and both stages show
> that the pandas UDF (which takes almost all the computation time of the
> DataFrame) is executed.
>
> It didn't seem to be an issue of repartition/coalesce as the DataFrame was
> still computed twice after removing coalesce.
>
>
>


Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Sebastian Piu
Can you share the stages as seen in the spark ui for the count and coalesce
jobs

My suggestion of moving things around was just for troubleshooting rather
than a solution of that wasn't clear before

On Mon, 31 Jan 2022, 08:07 Benjamin Du,  wrote:

> Remvoing coalesce didn't help either.
>
>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
> --
> *From:* Deepak Sharma 
> *Sent:* Sunday, January 30, 2022 12:45 AM
> *To:* Benjamin Du 
> *Cc:* u...@spark.incubator.apache.org 
> *Subject:* Re: A Persisted Spark DataFrame is computed twice
>
> coalesce returns a new dataset.
> That will cause the recomputation.
>
> Thanks
> Deepak
>
> On Sun, 30 Jan 2022 at 14:06, Benjamin Du  wrote:
>
> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(...) \
> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Benjamin Du
I did check the execution plan, there were 2 stages and both stages show that 
the pandas UDF (which takes almost all the computation time of the DataFrame) 
is executed.

It didn't seem to be an issue of repartition/coalesce as the DataFrame was 
still computed twice after removing coalesce.




Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Gourav Sengupta 
Sent: Sunday, January 30, 2022 1:08 AM
To: sebastian@gmail.com 
Cc: Benjamin Du ; u...@spark.incubator.apache.org 

Subject: Re: A Persisted Spark DataFrame is computed twice

Hi,

without getting into suppositions, the best option is to look into the SPARK UI 
SQL section.

It is the most wonderful tool to explain what is happening, and why. In SPARK 
3.x they have made the UI even better, with different set of granularity and 
details.

On another note, you might want to read the difference between repartition and 
coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu 
mailto:sebastian@gmail.com>> wrote:
It's probably the repartitioning and deserialising the df that you are seeing 
take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, 
mailto:legendu@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Benjamin Du
Remvoing coalesce didn't help either.




Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Deepak Sharma 
Sent: Sunday, January 30, 2022 12:45 AM
To: Benjamin Du 
Cc: u...@spark.incubator.apache.org 
Subject: Re: A Persisted Spark DataFrame is computed twice

coalesce returns a new dataset.
That will cause the recomputation.

Thanks
Deepak

On Sun, 30 Jan 2022 at 14:06, Benjamin Du 
mailto:legendu@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Benjamin Du
orageLevel.MEMORY_ONLY) \
.checkpoint() \
.write.mode("overwrite").parquet(output_mod)


  1.  cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce 
computes it twice

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.cache()
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


A

Manual output compute it only once. The function repart_hdfs below is a 
function written by myself to write a DataFrame to disk, read it back, 
repartition/coalesce it, and then write it back to HDFS.

spark.read.parquet("/input/hdfs/path") \
    .filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.write.mode("overwrite").parquet(output_mod)
repart_hdfs(spark, output_mod, 300, coalesce=True)











Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Sebastian Piu 
Sent: Sunday, January 30, 2022 12:44 AM
To: Benjamin Du 
Cc: u...@spark.incubator.apache.org 
Subject: Re: A Persisted Spark DataFrame is computed twice

It's probably the repartitioning and deserialising the df that you are seeing 
take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, 
mailto:legendu@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Gourav Sengupta
Hi,

without getting into suppositions, the best option is to look into the
SPARK UI SQL section.

It is the most wonderful tool to explain what is happening, and why. In
SPARK 3.x they have made the UI even better, with different set of
granularity and details.

On another note, you might want to read the difference between repartition
and coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu 
wrote:

> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du,  wrote:
>
>> I have some PySpark code like below. Basically, I persist a DataFrame
>> (which is time-consuming to compute) to disk, call the method
>> DataFrame.count to trigger the caching/persist immediately, and then I
>> coalesce the DataFrame to reduce the number of partitions (the original
>> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
>> execution time of job stages and the execution plan, it seems to me that
>> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
>> this happens?
>>
>> df = spark.read.parquet("/input/hdfs/path") \
>> .filter(...) \
>> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>> .persist(StorageLevel.DISK_ONLY)
>> df.count()
>> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>>
>>
>> BTW, it works well if I manually write the DataFrame to HDFS, read it
>> back, coalesce it and write it back to HDFS.
>> Originally post at
>> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
>> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>>
>> Best,
>>
>> 
>>
>> Ben Du
>>
>> Personal Blog <http://www.legendu.net/> | GitHub
>> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
>> | Docker Hub <https://hub.docker.com/r/dclong/>
>>
>


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Deepak Sharma
coalesce returns a new dataset.
That will cause the recomputation.

Thanks
Deepak

On Sun, 30 Jan 2022 at 14:06, Benjamin Du  wrote:

> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(...) \
> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Sebastian Piu
It's probably the repartitioning and deserialising the df that you are
seeing take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du,  wrote:

> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(...) \
> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> 
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>


A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Benjamin Du
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>