[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`
zhengruifeng commented on code in PR #37995: URL: https://github.com/apache/spark/pull/37995#discussion_r983016797 ## python/pyspark/pandas/series.py: ## @@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) -> int: raise ValueError("axis can only be 0 or 'index'") sdf = self._internal.spark_frame.select(self.spark.column, NATURAL_ORDER_COLUMN_NAME) seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__") + +cached = sdf.cache() Review Comment: > In this way, it can avoid triggering additional executions by zipWithIndex. we can truncate the SQL plan, but I think we still need additional action to compute the partition sizes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`
zhengruifeng commented on code in PR #37995: URL: https://github.com/apache/spark/pull/37995#discussion_r983016797 ## python/pyspark/pandas/series.py: ## @@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) -> int: raise ValueError("axis can only be 0 or 'index'") sdf = self._internal.spark_frame.select(self.spark.column, NATURAL_ORDER_COLUMN_NAME) seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__") + +cached = sdf.cache() Review Comment: >In this way, it can avoid triggering additional executions by zipWithIndex. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`
zhengruifeng commented on code in PR #37995: URL: https://github.com/apache/spark/pull/37995#discussion_r981915981 ## python/pyspark/pandas/series.py: ## @@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) -> int: raise ValueError("axis can only be 0 or 'index'") sdf = self._internal.spark_frame.select(self.spark.column, NATURAL_ORDER_COLUMN_NAME) seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__") + +cached = sdf.cache() Review Comment: Maybe we can avoid recomputation in another way: For a big input dataframe `sdf`: 1. have a subquery that compute the size of partitions on `sdf.select(lit(0))`, in this case the original expressions in `sdf` will not be evaluated? 2. with the sizes array `Array[Long]`, generate the result dataframe just like the `zipWithIndex` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #37995: [SPARK-40556][PS][SQL] Unpersist the intermediate datasets cached in `AttachDistributedSequenceExec`
zhengruifeng commented on code in PR #37995: URL: https://github.com/apache/spark/pull/37995#discussion_r981914861 ## python/pyspark/pandas/series.py: ## @@ -6442,6 +6445,8 @@ def argmin(self, axis: Axis = None, skipna: bool = True) -> int: raise ValueError("axis can only be 0 or 'index'") sdf = self._internal.spark_frame.select(self.spark.column, NATURAL_ORDER_COLUMN_NAME) seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__") + +cached = sdf.cache() Review Comment: hmm, the case maybe more complex: 1. in case an action is triggered and then a scalar is retuned, like `argmin`, we can explicitly persist the datasets and unpersist it after computation; 2. otherwise, (the most cases) another `DataFrame/Series` is returned, and we can not unpersist the cached datasets; I am wondering if there is significant regression if we do not localcheckpoint or cache the internal dataset? indexing operations easily invoke `attach_distributed_sequence_column`/`attach_default_index` 1. if the dataset is small, recomputation maybe not a big deal; 2. if the dataset is large (like 1 billion rows, or/and 100+columns), it may consume too many memory to persist the internal datasets; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org