[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`

2022-09-28 Thread GitBox


zhengruifeng commented on code in PR #37995:
URL: https://github.com/apache/spark/pull/37995#discussion_r983016797


##
python/pyspark/pandas/series.py:
##
@@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) 
-> int:
 raise ValueError("axis can only be 0 or 'index'")
 sdf = self._internal.spark_frame.select(self.spark.column, 
NATURAL_ORDER_COLUMN_NAME)
 seq_col_name = verify_temp_column_name(sdf, 
"__distributed_sequence_column__")
+
+cached = sdf.cache()

Review Comment:
   > In this way, it can avoid triggering additional executions by zipWithIndex.
   
   we can truncate the SQL plan, but I think we still need additional action to 
compute the partition sizes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`

2022-09-28 Thread GitBox


zhengruifeng commented on code in PR #37995:
URL: https://github.com/apache/spark/pull/37995#discussion_r983016797


##
python/pyspark/pandas/series.py:
##
@@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) 
-> int:
 raise ValueError("axis can only be 0 or 'index'")
 sdf = self._internal.spark_frame.select(self.spark.column, 
NATURAL_ORDER_COLUMN_NAME)
 seq_col_name = verify_temp_column_name(sdf, 
"__distributed_sequence_column__")
+
+cached = sdf.cache()

Review Comment:
   >In this way, it can avoid triggering additional executions by zipWithIndex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`

2022-09-27 Thread GitBox


zhengruifeng commented on code in PR #37995:
URL: https://github.com/apache/spark/pull/37995#discussion_r981915981


##
python/pyspark/pandas/series.py:
##
@@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) 
-> int:
 raise ValueError("axis can only be 0 or 'index'")
 sdf = self._internal.spark_frame.select(self.spark.column, 
NATURAL_ORDER_COLUMN_NAME)
 seq_col_name = verify_temp_column_name(sdf, 
"__distributed_sequence_column__")
+
+cached = sdf.cache()

Review Comment:
   Maybe we can avoid recomputation in another way:
   
   For a big input dataframe `sdf`:
   
   1. have a subquery that compute the size of partitions on 
`sdf.select(lit(0))`, in this case the original expressions in `sdf` will not 
be evaluated?
   2. with the sizes array `Array[Long]`, generate the result dataframe just 
like the `zipWithIndex`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`

2022-09-27 Thread GitBox


zhengruifeng commented on code in PR #37995:
URL: https://github.com/apache/spark/pull/37995#discussion_r981914861


##
python/pyspark/pandas/series.py:
##
@@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) 
-> int:
 raise ValueError("axis can only be 0 or 'index'")
 sdf = self._internal.spark_frame.select(self.spark.column, 
NATURAL_ORDER_COLUMN_NAME)
 seq_col_name = verify_temp_column_name(sdf, 
"__distributed_sequence_column__")
+
+cached = sdf.cache()

Review Comment:
   hmm, the case maybe more complex:
   
   1. in case an action is triggered and then a scalar is retuned, like 
`argmin`, we can explicitly persist the datasets and unpersist it after 
computation;
   2. otherwise, (the most cases) another `DataFrame/Series` is returned, and 
we can not unpersist the cached datasets;
   
   I am wondering if there is significant regression if we do not 
localcheckpoint or cache the internal dataset? indexing operations easily 
invoke `attach_distributed_sequence_column`/`attach_default_index`
   
   1. if the dataset is small, recomputation maybe not a big deal;
   2. if the dataset is large (like 1 billion rows, or/and 100+columns), it may 
consume too many memory to persist the internal datasets;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org