Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Benjamin Du
I did check the execution plan, there were 2 stages and both stages show that 
the pandas UDF (which takes almost all the computation time of the DataFrame) 
is executed.

It didn't seem to be an issue of repartition/coalesce as the DataFrame was 
still computed twice after removing coalesce.




Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Gourav Sengupta 
Sent: Sunday, January 30, 2022 1:08 AM
To: sebastian@gmail.com 
Cc: Benjamin Du ; u...@spark.incubator.apache.org 

Subject: Re: A Persisted Spark DataFrame is computed twice

Hi,

without getting into suppositions, the best option is to look into the SPARK UI 
SQL section.

It is the most wonderful tool to explain what is happening, and why. In SPARK 
3.x they have made the UI even better, with different set of granularity and 
details.

On another note, you might want to read the difference between repartition and 
coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu 
mailto:sebastian@gmail.com>> wrote:
It's probably the repartitioning and deserialising the df that you are seeing 
take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, 
mailto:legendu@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Benjamin Du
Remvoing coalesce didn't help either.




Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Deepak Sharma 
Sent: Sunday, January 30, 2022 12:45 AM
To: Benjamin Du 
Cc: u...@spark.incubator.apache.org 
Subject: Re: A Persisted Spark DataFrame is computed twice

coalesce returns a new dataset.
That will cause the recomputation.

Thanks
Deepak

On Sun, 30 Jan 2022 at 14:06, Benjamin Du 
mailto:legendu@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Benjamin Du
orageLevel.MEMORY_ONLY) \
.checkpoint() \
.write.mode("overwrite").parquet(output_mod)


  1.  cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce 
computes it twice

output_mod = f"{output}/job={mod}"
df = spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.cache()
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


A

Manual output compute it only once. The function repart_hdfs below is a 
function written by myself to write a DataFrame to disk, read it back, 
repartition/coalesce it, and then write it back to HDFS.

spark.read.parquet("/input/hdfs/path") \
.filter(col("n0") == n0) \
.filter(col("n1") == n1) \
.filter(col("h1") == h1) \
.filter(col("j1").isin(j1)) \
.filter(col("j0") == j0) \
.filter(col("h0").isin(h0)) \
.filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
.withColumn("test", test_score_r4(col("id0"), col("id1"))) \
.write.mode("overwrite").parquet(output_mod)
repart_hdfs(spark, output_mod, 300, coalesce=True)











Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Sebastian Piu 
Sent: Sunday, January 30, 2022 12:44 AM
To: Benjamin Du 
Cc: u...@spark.incubator.apache.org 
Subject: Re: A Persisted Spark DataFrame is computed twice

It's probably the repartitioning and deserialising the df that you are seeing 
take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, 
mailto:legendu@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Benjamin Du
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
.filter(...) \
.withColumn("new_col", my_pandas_udf("col0", "col1")) \
.persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.

Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub


Re: [RNG]: How does Spark handle RNGs?

2021-10-04 Thread Benjamin Du
"Operations on the executor will generally calculate and store a seed once"

Can you elaborate more this? Does Spark try to seed RNGs to ensure overall 
quality of random number generating? To give an extremely example, if all 
workers use the same seed, then RNGs repeat the same numbers on each worker, 
which is obviously a poor choice.


Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


From: Sean Owen 
Sent: Monday, October 4, 2021 1:00 PM
To: Benjamin Du 
Cc: user@spark.apache.org 
Subject: Re: [RNG]: How does Spark handle RNGs?

The 2nd approach. Spark doesn't work in the 1st way in any context - the driver 
and executor processes do not cooperate during execution.
Operations on the executor will generally calculate and store a seed once, and 
use that in RNGs, to make its computation reproducible.

On Mon, Oct 4, 2021 at 2:20 PM Benjamin Du 
mailto:legendu@outlook.com>> wrote:
Hi everyone,

I'd like to ask how does Spark (or more generally, distributed computing 
engines) handle RNGs? High-level speaking, there are two ways,

  1.  Use a single RNG on the driver and random numbers generating on each work 
makes request to the single RNG on the driver.
  2.  Use a separate RNG on each worker.

If the 2nd approach above is used, may I ask how does Spark seed RNGs on 
different works to ensure the overall quality of random number generating?


Best,



Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>


[RNG]: How does Spark handle RNGs?

2021-10-04 Thread Benjamin Du
Hi everyone,

I'd like to ask how does Spark (or more generally, distributed computing 
engines) handle RNGs? High-level speaking, there are two ways,

  1.  Use a single RNG on the driver and random numbers generating on each work 
makes request to the single RNG on the driver.
  2.  Use a separate RNG on each worker.

If the 2nd approach above is used, may I ask how does Spark seed RNGs on 
different works to ensure the overall quality of random number generating?


Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub


[RNG]: How does Spark handle RNGs?

2021-10-04 Thread Benjamin Du
Hi everyone,

I'd like to ask how does Spark (or more generally, distributed computing 
engines) handle RNGs? High-level speaking, there are two ways,

  1.  Use a single RNG on the driver and random numbers generating on each work 
makes request to the single RNG on the driver.
  2.  Use a separate RNG on each worker.

If the 2nd approach above is used, may I ask how does Spark seed RNGs on 
different works to ensure the overall quality of random number generating?


Best,



Ben Du

Personal Blog | GitHub | 
Bitbucket | Docker 
Hub