Re: A Persisted Spark DataFrame is computed twice
.persist(StorageLevel.MEMORY_ONLY) \ > .checkpoint() \ > .coalesce(300).write.mode("overwrite").parquet(output_mod) > > >1. persist (to memory) + checkpoint + without coalesce computes the >DataFrame twice > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .persist(StorageLevel.MEMORY_ONLY) \ > .checkpoint() \ > .write.mode("overwrite").parquet(output_mod) > > >1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce >computes it twice > > output_mod = f"{output}/job={mod}" > df = spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .cache() > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > A > > Manual output compute it only once. The function repart_hdfs below is a > function written by myself to write a DataFrame to disk, read it back, > repartition/coalesce it, and then write it back to HDFS. > > spark.read.parquet("/input/hdfs/path") \ > .filter(col("n0") == n0) \ > .filter(col("n1") == n1) \ > .filter(col("h1") == h1) \ > .filter(col("j1").isin(j1)) \ > .filter(col("j0") == j0) \ > .filter(col("h0").isin(h0)) \ > .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ > .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ > .write.mode("overwrite").parquet(output_mod) > repart_hdfs(spark, output_mod, 300, coalesce=True) > > > > > > > > > > > > Best, > > > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > ------ > *From:* Sebastian Piu > *Sent:* Sunday, January 30, 2022 12:44 AM > *To:* Benjamin Du > *Cc:* u...@spark.incubator.apache.org > *Subject:* Re: A Persisted Spark DataFrame is computed twice > > It's probably the repartitioning and deserialising the df that you are > seeing take time. Try doing this > > 1. Add another count after your current one and compare times > 2. Move coalesce before persist > > > > You should see > > On Sun, 30 Jan 2022, 08:37 Benjamin Du, wrote: > > I have some PySpark code like below. Basically, I persist a DataFrame > (which is time-consuming to compute) to disk, call the method > DataFrame.count to trigger the caching/persist immediately, and then I > coalesce the DataFrame to reduce the number of partitions (the original > DataFrame has 30,000 partitions) and output it to HDFS. Based on the > execution time of job stages and the execution plan, it seems to me that > the DataFrame is recomputed at df.coalesce(300). Does anyone know why > this happens? > > df = spark.read.parquet("/input/hdfs/path") \ > .filter(...) \ > .withColumn("new_col", my_pandas_udf("col0", "col1")) \ > .persist(StorageLevel.DISK_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > BTW, it works well if I manually write the DataFrame to HDFS, read it > back, coalesce it and write it back to HDFS. > Originally post at > https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice. > <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> > > Best, > > > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > >
Re: A Persisted Spark DataFrame is computed twice
One guess - you are doing two things here, count() and write(). There is a persist(), but it's async. It won't necessarily wait for the persist to finish before proceeding and may have to recompute at least some partitions for the second op. You could debug further by looking at the stages and seeing what exactly is executing and where it uses cached partitions or not. On Mon, Jan 31, 2022 at 2:12 AM Benjamin Du wrote: > I did check the execution plan, there were 2 stages and both stages show > that the pandas UDF (which takes almost all the computation time of the > DataFrame) is executed. > > It didn't seem to be an issue of repartition/coalesce as the DataFrame was > still computed twice after removing coalesce. > > >
Re: A Persisted Spark DataFrame is computed twice
Can you share the stages as seen in the spark ui for the count and coalesce jobs My suggestion of moving things around was just for troubleshooting rather than a solution of that wasn't clear before On Mon, 31 Jan 2022, 08:07 Benjamin Du, wrote: > Remvoing coalesce didn't help either. > > > > Best, > > > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > -- > *From:* Deepak Sharma > *Sent:* Sunday, January 30, 2022 12:45 AM > *To:* Benjamin Du > *Cc:* u...@spark.incubator.apache.org > *Subject:* Re: A Persisted Spark DataFrame is computed twice > > coalesce returns a new dataset. > That will cause the recomputation. > > Thanks > Deepak > > On Sun, 30 Jan 2022 at 14:06, Benjamin Du wrote: > > I have some PySpark code like below. Basically, I persist a DataFrame > (which is time-consuming to compute) to disk, call the method > DataFrame.count to trigger the caching/persist immediately, and then I > coalesce the DataFrame to reduce the number of partitions (the original > DataFrame has 30,000 partitions) and output it to HDFS. Based on the > execution time of job stages and the execution plan, it seems to me that > the DataFrame is recomputed at df.coalesce(300). Does anyone know why > this happens? > > df = spark.read.parquet("/input/hdfs/path") \ > .filter(...) \ > .withColumn("new_col", my_pandas_udf("col0", "col1")) \ > .persist(StorageLevel.DISK_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > BTW, it works well if I manually write the DataFrame to HDFS, read it > back, coalesce it and write it back to HDFS. > Originally post at > https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice. > <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> > > Best, > > > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net >
Re: A Persisted Spark DataFrame is computed twice
I did check the execution plan, there were 2 stages and both stages show that the pandas UDF (which takes almost all the computation time of the DataFrame) is executed. It didn't seem to be an issue of repartition/coalesce as the DataFrame was still computed twice after removing coalesce. Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/> From: Gourav Sengupta Sent: Sunday, January 30, 2022 1:08 AM To: sebastian@gmail.com Cc: Benjamin Du ; u...@spark.incubator.apache.org Subject: Re: A Persisted Spark DataFrame is computed twice Hi, without getting into suppositions, the best option is to look into the SPARK UI SQL section. It is the most wonderful tool to explain what is happening, and why. In SPARK 3.x they have made the UI even better, with different set of granularity and details. On another note, you might want to read the difference between repartition and coalesce before making any kind of assumptions. Regards, Gourav Sengupta On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu mailto:sebastian@gmail.com>> wrote: It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this 1. Add another count after your current one and compare times 2. Move coalesce before persist You should see On Sun, 30 Jan 2022, 08:37 Benjamin Du, mailto:legendu@outlook.com>> wrote: I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens? df = spark.read.parquet("/input/hdfs/path") \ .filter(...) \ .withColumn("new_col", my_pandas_udf("col0", "col1")) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS. Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>
Re: A Persisted Spark DataFrame is computed twice
Remvoing coalesce didn't help either. Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/> From: Deepak Sharma Sent: Sunday, January 30, 2022 12:45 AM To: Benjamin Du Cc: u...@spark.incubator.apache.org Subject: Re: A Persisted Spark DataFrame is computed twice coalesce returns a new dataset. That will cause the recomputation. Thanks Deepak On Sun, 30 Jan 2022 at 14:06, Benjamin Du mailto:legendu@outlook.com>> wrote: I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens? df = spark.read.parquet("/input/hdfs/path") \ .filter(...) \ .withColumn("new_col", my_pandas_udf("col0", "col1")) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS. Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/> -- Thanks Deepak www.bigdatabig.com<http://www.bigdatabig.com> www.keosha.net<http://www.keosha.net>
Re: A Persisted Spark DataFrame is computed twice
orageLevel.MEMORY_ONLY) \ .checkpoint() \ .write.mode("overwrite").parquet(output_mod) 1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce computes it twice output_mod = f"{output}/job={mod}" df = spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .cache() df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) A Manual output compute it only once. The function repart_hdfs below is a function written by myself to write a DataFrame to disk, read it back, repartition/coalesce it, and then write it back to HDFS. spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .write.mode("overwrite").parquet(output_mod) repart_hdfs(spark, output_mod, 300, coalesce=True) Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/> From: Sebastian Piu Sent: Sunday, January 30, 2022 12:44 AM To: Benjamin Du Cc: u...@spark.incubator.apache.org Subject: Re: A Persisted Spark DataFrame is computed twice It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this 1. Add another count after your current one and compare times 2. Move coalesce before persist You should see On Sun, 30 Jan 2022, 08:37 Benjamin Du, mailto:legendu@outlook.com>> wrote: I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens? df = spark.read.parquet("/input/hdfs/path") \ .filter(...) \ .withColumn("new_col", my_pandas_udf("col0", "col1")) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS. Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>
Re: A Persisted Spark DataFrame is computed twice
Hi, without getting into suppositions, the best option is to look into the SPARK UI SQL section. It is the most wonderful tool to explain what is happening, and why. In SPARK 3.x they have made the UI even better, with different set of granularity and details. On another note, you might want to read the difference between repartition and coalesce before making any kind of assumptions. Regards, Gourav Sengupta On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu wrote: > It's probably the repartitioning and deserialising the df that you are > seeing take time. Try doing this > > 1. Add another count after your current one and compare times > 2. Move coalesce before persist > > > > You should see > > On Sun, 30 Jan 2022, 08:37 Benjamin Du, wrote: > >> I have some PySpark code like below. Basically, I persist a DataFrame >> (which is time-consuming to compute) to disk, call the method >> DataFrame.count to trigger the caching/persist immediately, and then I >> coalesce the DataFrame to reduce the number of partitions (the original >> DataFrame has 30,000 partitions) and output it to HDFS. Based on the >> execution time of job stages and the execution plan, it seems to me that >> the DataFrame is recomputed at df.coalesce(300). Does anyone know why >> this happens? >> >> df = spark.read.parquet("/input/hdfs/path") \ >> .filter(...) \ >> .withColumn("new_col", my_pandas_udf("col0", "col1")) \ >> .persist(StorageLevel.DISK_ONLY) >> df.count() >> df.coalesce(300).write.mode("overwrite").parquet(output_mod) >> >> >> BTW, it works well if I manually write the DataFrame to HDFS, read it >> back, coalesce it and write it back to HDFS. >> Originally post at >> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice. >> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> >> >> Best, >> >> >> >> Ben Du >> >> Personal Blog <http://www.legendu.net/> | GitHub >> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> >> | Docker Hub <https://hub.docker.com/r/dclong/> >> >
Re: A Persisted Spark DataFrame is computed twice
coalesce returns a new dataset. That will cause the recomputation. Thanks Deepak On Sun, 30 Jan 2022 at 14:06, Benjamin Du wrote: > I have some PySpark code like below. Basically, I persist a DataFrame > (which is time-consuming to compute) to disk, call the method > DataFrame.count to trigger the caching/persist immediately, and then I > coalesce the DataFrame to reduce the number of partitions (the original > DataFrame has 30,000 partitions) and output it to HDFS. Based on the > execution time of job stages and the execution plan, it seems to me that > the DataFrame is recomputed at df.coalesce(300). Does anyone know why > this happens? > > df = spark.read.parquet("/input/hdfs/path") \ > .filter(...) \ > .withColumn("new_col", my_pandas_udf("col0", "col1")) \ > .persist(StorageLevel.DISK_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > BTW, it works well if I manually write the DataFrame to HDFS, read it > back, coalesce it and write it back to HDFS. > Originally post at > https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice. > <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> > > Best, > > > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> > -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: A Persisted Spark DataFrame is computed twice
It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this 1. Add another count after your current one and compare times 2. Move coalesce before persist You should see On Sun, 30 Jan 2022, 08:37 Benjamin Du, wrote: > I have some PySpark code like below. Basically, I persist a DataFrame > (which is time-consuming to compute) to disk, call the method > DataFrame.count to trigger the caching/persist immediately, and then I > coalesce the DataFrame to reduce the number of partitions (the original > DataFrame has 30,000 partitions) and output it to HDFS. Based on the > execution time of job stages and the execution plan, it seems to me that > the DataFrame is recomputed at df.coalesce(300). Does anyone know why > this happens? > > df = spark.read.parquet("/input/hdfs/path") \ > .filter(...) \ > .withColumn("new_col", my_pandas_udf("col0", "col1")) \ > .persist(StorageLevel.DISK_ONLY) > df.count() > df.coalesce(300).write.mode("overwrite").parquet(output_mod) > > > BTW, it works well if I manually write the DataFrame to HDFS, read it > back, coalesce it and write it back to HDFS. > Originally post at > https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice. > <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> > > Best, > > > > Ben Du > > Personal Blog <http://www.legendu.net/> | GitHub > <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/> > | Docker Hub <https://hub.docker.com/r/dclong/> >
A Persisted Spark DataFrame is computed twice
I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens? df = spark.read.parquet("/input/hdfs/path") \ .filter(...) \ .withColumn("new_col", my_pandas_udf("col0", "col1")) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS. Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> Best, Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>