viirya commented on code in PR #37490:
URL: https://github.com/apache/spark/pull/37490#discussion_r945320563
##########
python/pyspark/sql/catalog.py:
##########
@@ -674,59 +868,267 @@ def registerFunction(
warnings.warn("Deprecated in 2.3.0. Use spark.udf.register instead.",
FutureWarning)
return self._sparkSession.udf.register(name, f, returnType)
- @since(2.0)
def isCached(self, tableName: str) -> bool:
- """Returns true if the table is currently cached in-memory.
+ """
+ Returns true if the table is currently cached in-memory.
+
+ .. versionadded:: 2.0.0
+
+ Parameters
+ ----------
+ tableName : str
+ name of the table to get.
+
+ .. versionchanged:: 3.4.0
+ Allow ``tableName`` to be qualified with catalog name.
+
+ Returns
+ -------
+ bool
+
+ Examples
+ --------
+ >>> _ = spark.sql("DROP TABLE IF EXISTS tbl1")
+ >>> _ = spark.sql("CREATE TABLE tbl1 (name STRING, age INT) USING
parquet")
+ >>> spark.catalog.cacheTable("tbl1")
+ >>> spark.catalog.isCached("tbl1")
+ True
+
+ Throw an analysis exception when the table does not exists.
+
+ >>> spark.catalog.isCached("not_existing_table")
+ Traceback (most recent call last):
+ ...
+ pyspark.sql.utils.AnalysisException: ...
- .. versionchanged:: 3.4
- Allowed ``tableName`` to be qualified with catalog name.
+ Using the fully qualified name for the table.
+
+ >>> spark.catalog.isCached("spark_catalog.default.tbl1")
+ True
+ >>> spark.catalog.uncacheTable("tbl1")
+ >>> _ = spark.sql("DROP TABLE tbl1")
"""
return self._jcatalog.isCached(tableName)
- @since(2.0)
def cacheTable(self, tableName: str) -> None:
"""Caches the specified table in-memory.
- .. versionchanged:: 3.4
- Allowed ``tableName`` to be qualified with catalog name.
+ .. versionadded:: 2.0.0
+
+ Parameters
+ ----------
+ tableName : str
+ name of the table to get.
+
+ .. versionchanged:: 3.4.0
+ Allow ``tableName`` to be qualified with catalog name.
+
+ Examples
+ --------
+ >>> _ = spark.sql("DROP TABLE IF EXISTS tbl1")
+ >>> _ = spark.sql("CREATE TABLE tbl1 (name STRING, age INT) USING
parquet")
+ >>> spark.catalog.cacheTable("tbl1")
+
+ Throw an analysis exception when the table does not exists.
+
+ >>> spark.catalog.cacheTable("not_existing_table")
+ Traceback (most recent call last):
+ ...
+ pyspark.sql.utils.AnalysisException: ...
+
+ Using the fully qualified name for the table.
+
+ >>> spark.catalog.cacheTable("spark_catalog.default.tbl1")
+ >>> spark.catalog.uncacheTable("tbl1")
+ >>> _ = spark.sql("DROP TABLE tbl1")
"""
self._jcatalog.cacheTable(tableName)
- @since(2.0)
def uncacheTable(self, tableName: str) -> None:
"""Removes the specified table from the in-memory cache.
- .. versionchanged:: 3.4
- Allowed ``tableName`` to be qualified with catalog name.
+ .. versionadded:: 2.0.0
+
+ Parameters
+ ----------
+ tableName : str
+ name of the table to get.
+
+ .. versionchanged:: 3.4.0
+ Allow ``tableName`` to be qualified with catalog name.
+
+ Examples
+ --------
+ >>> _ = spark.sql("DROP TABLE IF EXISTS tbl1")
+ >>> _ = spark.sql("CREATE TABLE tbl1 (name STRING, age INT) USING
parquet")
+ >>> spark.catalog.cacheTable("tbl1")
+ >>> spark.catalog.uncacheTable("tbl1")
+ >>> spark.catalog.isCached("tbl1")
+ False
+
+ Throw an analysis exception when the table does not exists.
+
+ >>> spark.catalog.uncacheTable("not_existing_table") # doctest:
+IGNORE_EXCEPTION_DETAIL
+ Traceback (most recent call last):
+ ...
+ pyspark.sql.utils.AnalysisException: ...
+
+ Using the fully qualified name for the table.
+
+ >>> spark.catalog.uncacheTable("spark_catalog.default.tbl1")
+ >>> spark.catalog.isCached("tbl1")
+ False
+ >>> _ = spark.sql("DROP TABLE tbl1")
"""
self._jcatalog.uncacheTable(tableName)
- @since(2.0)
def clearCache(self) -> None:
- """Removes all cached tables from the in-memory cache."""
+ """Removes all cached tables from the in-memory cache.
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> _ = spark.sql("DROP TABLE IF EXISTS tbl1")
+ >>> _ = spark.sql("CREATE TABLE tbl1 (name STRING, age INT) USING
parquet")
+ >>> spark.catalog.clearCache()
+ >>> spark.catalog.isCached("tbl1")
+ False
+ >>> _ = spark.sql("DROP TABLE tbl1")
+ """
self._jcatalog.clearCache()
- @since(2.0)
def refreshTable(self, tableName: str) -> None:
"""Invalidates and refreshes all the cached data and metadata of the
given table.
- .. versionchanged:: 3.4
- Allowed ``tableName`` to be qualified with catalog name.
+ .. versionadded:: 2.0.0
+
+ Parameters
+ ----------
+ tableName : str
+ name of the table to get.
+
+ .. versionchanged:: 3.4.0
+ Allow ``tableName`` to be qualified with catalog name.
+
+ Examples
+ --------
+ The example below caches a table, and then remove the data.
+
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
+ ... _ = spark.sql("CREATE TABLE tbl1 (col STRING) USING TEXT
LOCATION '{}'".format(d))
+ ... _ = spark.sql("INSERT INTO tbl1 SELECT 'abc'")
+ ... spark.catalog.cacheTable("tbl1")
+ ... spark.table("tbl1").show()
+ +---+
+ |col|
+ +---+
+ |abc|
+ +---+
+
+ Because the table is cached, it computes from the cached data as below.
+
+ >>> spark.table("tbl1").count()
+ 1
+
+ After refreshing the table, it shows 0 because the data does not exist
anymore.
+
+ >>> spark.catalog.refreshTable("tbl1")
+ >>> spark.table("tbl1").count()
+ 0
+
+ Using the fully qualified name for the table.
+
+ >>> spark.catalog.refreshTable("spark_catalog.default.tbl1")
+ >>> _ = spark.sql("DROP TABLE tbl1")
"""
self._jcatalog.refreshTable(tableName)
- @since("2.1.1")
def recoverPartitions(self, tableName: str) -> None:
"""Recovers all the partitions of the given table and update the
catalog.
+ .. versionadded:: 2.1.1
+
+ Parameters
+ ----------
+ tableName : str
+ name of the table to get.
+
+ Notes
+ -----
Only works with a partitioned table, and not a view.
+
+ Examples
+ --------
+ The example below creates a partitioned table against the existing
directory of
+ the partitioned table. After that, it recovers the partitions.
+
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
+ ... spark.range(1).selectExpr(
+ ... "id as key", "id as
value").write.partitionBy("key").mode("overwrite").save(d)
+ ... _ = spark.sql(
+ ... "CREATE TABLE tbl1 (key LONG, value LONG)"
+ ... "USING parquet OPTIONS (path '{}') PARTITIONED BY
(key)".format(d))
+ ... spark.table("tbl1").show()
+ ... spark.catalog.recoverPartitions("tbl1")
+ ... spark.table("tbl1").show()
+ +-----+---+
+ |value|key|
+ +-----+---+
+ +-----+---+
+ +-----+---+
+ |value|key|
+ +-----+---+
+ | 0| 0|
+ +-----+---+
+ >>> _ = spark.sql("DROP TABLE tbl1")
"""
self._jcatalog.recoverPartitions(tableName)
- @since("2.2.0")
def refreshByPath(self, path: str) -> None:
"""Invalidates and refreshes all the cached data (and the associated
metadata) for any
DataFrame that contains the given data source path.
+
+ .. versionadded:: 2.2.0
+
+ Parameters
+ ----------
+ path : str
+ the path to refresh the cache.
+
+ Examples
+ --------
+ The example below caches a table, and then remove the data.
Review Comment:
```suggestion
The example below caches a table, and then removes the data.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]