This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new fd5fe8d2c6 Revert ObjectStorage config variables name (#38415) fd5fe8d2c6 is described below commit fd5fe8d2c698b9c26cee47fd0af2f211e9fee7e6 Author: Pankaj Singh <98807258+pankajas...@users.noreply.github.com> AuthorDate: Sat Mar 23 02:51:48 2024 +0530 Revert ObjectStorage config variables name (#38415) * Revert ObjectStorage config variables name * Fix tests --- airflow/providers/common/io/provider.yaml | 6 +++--- airflow/providers/common/io/xcom/backend.py | 12 ++++++------ docs/apache-airflow-providers-common-io/xcom_backend.rst | 12 ++++++------ docs/apache-airflow/core-concepts/xcoms.rst | 12 ++++++------ docs/spelling_wordlist.txt | 1 - tests/providers/common/io/xcom/test_backend.py | 10 +++++----- 6 files changed, 26 insertions(+), 27 deletions(-) diff --git a/airflow/providers/common/io/provider.yaml b/airflow/providers/common/io/provider.yaml index acd175c509..022205bc33 100644 --- a/airflow/providers/common/io/provider.yaml +++ b/airflow/providers/common/io/provider.yaml @@ -53,14 +53,14 @@ config: common.io: description: Common IO configuration section options: - xcom_objectstore_path: + xcom_objectstorage_path: description: | Path to a location on object storage where XComs can be stored in url format. version_added: 1.3.0 type: string example: "s3://conn_id@bucket/path" default: "" - xcom_objectstore_threshold: + xcom_objectstorage_threshold: description: | Threshold in bytes for storing XComs in object storage. -1 means always store in the database. 0 means always store in object storage. Any positive number means @@ -69,7 +69,7 @@ config: type: integer example: "1000000" default: "-1" - xcom_objectstore_compression: + xcom_objectstorage_compression: description: | Compression algorithm to use when storing XComs in object storage. Supported algorithms are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified, no compression will be used. diff --git a/airflow/providers/common/io/xcom/backend.py b/airflow/providers/common/io/xcom/backend.py index 14d1009ead..061a90ae03 100644 --- a/airflow/providers/common/io/xcom/backend.py +++ b/airflow/providers/common/io/xcom/backend.py @@ -81,7 +81,7 @@ class XComObjectStoreBackend(BaseXCom): :raises ValueError: if the key is not relative to the configured path :raises TypeError: if the url is not a valid url or cannot be split """ - path = conf.get(SECTION, "xcom_objectstore_path", fallback="") + path = conf.get(SECTION, "xcom_objectstorage_path", fallback="") p = ObjectStoragePath(path) # normalize the path @@ -115,8 +115,8 @@ class XComObjectStoreBackend(BaseXCom): # we will always serialize ourselves and not by BaseXCom as the deserialize method # from BaseXCom accepts only XCom objects and not the value directly s_val = json.dumps(value, cls=XComEncoder).encode("utf-8") - path = conf.get(SECTION, "xcom_objectstore_path", fallback="") - compression = conf.get(SECTION, "xcom_objectstore_compression", fallback=None) + path = conf.get(SECTION, "xcom_objectstorage_path", fallback="") + compression = conf.get(SECTION, "xcom_objectstorage_compression", fallback=None) if compression: suffix = "." + _get_compression_suffix(compression) @@ -124,7 +124,7 @@ class XComObjectStoreBackend(BaseXCom): suffix = "" compression = None - threshold = conf.getint(SECTION, "xcom_objectstore_threshold", fallback=-1) + threshold = conf.getint(SECTION, "xcom_objectstorage_threshold", fallback=-1) if path and -1 < threshold < len(s_val): # safeguard against collisions @@ -152,7 +152,7 @@ class XComObjectStoreBackend(BaseXCom): Compression is inferred from the file extension. """ data = BaseXCom.deserialize_value(result) - path = conf.get(SECTION, "xcom_objectstore_path", fallback="") + path = conf.get(SECTION, "xcom_objectstorage_path", fallback="") try: p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data) @@ -164,7 +164,7 @@ class XComObjectStoreBackend(BaseXCom): @staticmethod def purge(xcom: XCom, session: Session) -> None: - path = conf.get(SECTION, "xcom_objectstore_path", fallback="") + path = conf.get(SECTION, "xcom_objectstorage_path", fallback="") if isinstance(xcom.value, str): try: p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(xcom.value) diff --git a/docs/apache-airflow-providers-common-io/xcom_backend.rst b/docs/apache-airflow-providers-common-io/xcom_backend.rst index 1959f3791d..9216fff711 100644 --- a/docs/apache-airflow-providers-common-io/xcom_backend.rst +++ b/docs/apache-airflow-providers-common-io/xcom_backend.rst @@ -20,11 +20,11 @@ Object Storage XCom Backend The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs. -To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstore_path`` to the desired location. The connection -id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstore_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstore_threshold`` is required +To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection +id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be -saved in the database. Finally, you can set ``xcom_objectstore_compression`` to fsspec supported compression methods like ``zip`` or ``snappy`` to +saved in the database. Finally, you can set ``xcom_objectstorage_compression`` to fsspec supported compression methods like ``zip`` or ``snappy`` to compress the data before storing it in object storage. So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip:: @@ -33,9 +33,9 @@ So for example the following configuration will store anything above 1MB in S3 a xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend [common.io] - xcom_objectstore_path = s3://conn_id@mybucket/key - xcom_objectstore_threshold = 1048576 - xcom_objectstore_compression = gzip + xcom_objectstorage_path = s3://conn_id@mybucket/key + xcom_objectstorage_threshold = 1048576 + xcom_objectstorage_compression = gzip .. note:: diff --git a/docs/apache-airflow/core-concepts/xcoms.rst b/docs/apache-airflow/core-concepts/xcoms.rst index f8d5f65632..cd72044f7a 100644 --- a/docs/apache-airflow/core-concepts/xcoms.rst +++ b/docs/apache-airflow/core-concepts/xcoms.rst @@ -64,11 +64,11 @@ Object Storage XCom Backend The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs. -To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstore_path`` to the desired location. The connection -id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstore_threshold`` is required +To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection +id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be -saved in the database. Finally, you can set ``xcom_objectstore_compression`` to fsspec supported compression methods like ``zip`` or ``snappy`` to +saved in the database. Finally, you can set ``xcom_objectstorage_compression`` to fsspec supported compression methods like ``zip`` or ``snappy`` to compress the data before storing it in object storage. So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip:: @@ -77,9 +77,9 @@ So for example the following configuration will store anything above 1MB in S3 a xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend [common.io] - xcom_objectstore_path = s3://conn_id@mybucket/key - xcom_objectstore_threshold = 1048576 - xcom_objectstore_compression = gzip + xcom_objectstorage_path = s3://conn_id@mybucket/key + xcom_objectstorage_threshold = 1048576 + xcom_objectstoragee_compression = gzip .. note:: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 8a20bfc960..f985451620 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1094,7 +1094,6 @@ oauth Oauthlib objectORfile objectstorage -objectstore observability od odbc diff --git a/tests/providers/common/io/xcom/test_backend.py b/tests/providers/common/io/xcom/test_backend.py index de13719779..d85133b194 100644 --- a/tests/providers/common/io/xcom/test_backend.py +++ b/tests/providers/common/io/xcom/test_backend.py @@ -94,14 +94,14 @@ class TestXcomObjectStoreBackend: except DuplicateSectionError: pass conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStoreBackend") - conf.set("common.io", "xcom_objectstore_path", self.path) - conf.set("common.io", "xcom_objectstore_threshold", "50") + conf.set("common.io", "xcom_objectstorage_path", self.path) + conf.set("common.io", "xcom_objectstorage_threshold", "50") settings.configure_vars() def teardown_method(self): conf.remove_option("core", "xcom_backend") - conf.remove_option("common.io", "xcom_objectstore_path") - conf.remove_option("common.io", "xcom_objectstore_threshold") + conf.remove_option("common.io", "xcom_objectstorage_path") + conf.remove_option("common.io", "xcom_objectstorage_threshold") settings.configure_vars() p = ObjectStoragePath(self.path) if p.exists(): @@ -223,7 +223,7 @@ class TestXcomObjectStoreBackend: assert p.exists() is False @pytest.mark.db_test - @conf_vars({("common.io", "xcom_objectstore_compression"): "gzip"}) + @conf_vars({("common.io", "xcom_objectstorage_compression"): "gzip"}) def test_compression(self, task_instance, session): XCom = resolve_xcom_backend() airflow.models.xcom.XCom = XCom