[airflow] 26/32: Allow to define custom XCom class (#8560)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 64c89db14308c567ac424201ed38cc452ddc6afd Author: Tomek Urbaszek AuthorDate: Tue Apr 28 16:55:05 2020 +0200 Allow to define custom XCom class (#8560) * Allow to define custom XCom class closes: #8059 (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c) --- airflow/config_templates/config.yml | 7 airflow/config_templates/default_airflow.cfg | 4 +++ airflow/models/xcom.py | 34 ++- docs/concepts.rst| 9 + tests/models/test_xcom.py| 50 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d1c2c90..f54255e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -476,6 +476,13 @@ type: string example: ~ default: "True" +- name: xcom_backend + description: | +Path to custom XCom class that will be used to store and resolve operators results + version_added: 1.10.12 + type: string + example: "path.to.CustomXCom" + default: "airflow.models.xcom.BaseXCom" - name: secrets description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf83b34..e18e538 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30 # On each dagrun check against defined SLAs check_slas = True +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + [secrets] # Full class name of secrets backend to enable (will precede env vars and metastore in search path) # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f4522b5..0b6a81d 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -class XCom(Base, LoggingMixin): +class BaseXCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin): "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise + +@staticmethod +def deserialize_value(result) -> Any: +# TODO: "pickling" has been deprecated and JSON is preferred. +# "pickling" will be removed in Airflow 2.0. +enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +return pickle.loads(result.value) + +try: +return json.loads(result.value.decode('UTF-8')) +except ValueError: +log.error("Could not deserialize the XCOM value from JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") +raise + + +def resolve_xcom_backend(): +"""Resolves custom XCom class""" +clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") +if clazz: +if not issubclass(clazz, BaseXCom): +raise TypeError( +f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`." +) +return clazz +return BaseXCom + + +XCom = resolve_xcom_backend() diff --git a/docs/concepts.rst b/docs/concepts.rst index e85c5b3..dd48003 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -660,6 +660,15 @@ of what this may look like: Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. +Custom XCom backend +''' + +It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result. +To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point +to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization / +deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` +methods. + .. _concepts:variables: Variables diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py new file mode 100644 index 000..206b074 --- /dev/null +++ b/tests/models/test_xcom.py @@ -0,0 +1,50 @@ +# Licensed to the
[airflow] 26/32: Allow to define custom XCom class (#8560)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 64c89db14308c567ac424201ed38cc452ddc6afd Author: Tomek Urbaszek AuthorDate: Tue Apr 28 16:55:05 2020 +0200 Allow to define custom XCom class (#8560) * Allow to define custom XCom class closes: #8059 (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c) --- airflow/config_templates/config.yml | 7 airflow/config_templates/default_airflow.cfg | 4 +++ airflow/models/xcom.py | 34 ++- docs/concepts.rst| 9 + tests/models/test_xcom.py| 50 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d1c2c90..f54255e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -476,6 +476,13 @@ type: string example: ~ default: "True" +- name: xcom_backend + description: | +Path to custom XCom class that will be used to store and resolve operators results + version_added: 1.10.12 + type: string + example: "path.to.CustomXCom" + default: "airflow.models.xcom.BaseXCom" - name: secrets description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf83b34..e18e538 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30 # On each dagrun check against defined SLAs check_slas = True +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + [secrets] # Full class name of secrets backend to enable (will precede env vars and metastore in search path) # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f4522b5..0b6a81d 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -class XCom(Base, LoggingMixin): +class BaseXCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin): "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise + +@staticmethod +def deserialize_value(result) -> Any: +# TODO: "pickling" has been deprecated and JSON is preferred. +# "pickling" will be removed in Airflow 2.0. +enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +return pickle.loads(result.value) + +try: +return json.loads(result.value.decode('UTF-8')) +except ValueError: +log.error("Could not deserialize the XCOM value from JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") +raise + + +def resolve_xcom_backend(): +"""Resolves custom XCom class""" +clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") +if clazz: +if not issubclass(clazz, BaseXCom): +raise TypeError( +f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`." +) +return clazz +return BaseXCom + + +XCom = resolve_xcom_backend() diff --git a/docs/concepts.rst b/docs/concepts.rst index e85c5b3..dd48003 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -660,6 +660,15 @@ of what this may look like: Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. +Custom XCom backend +''' + +It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result. +To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point +to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization / +deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` +methods. + .. _concepts:variables: Variables diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py new file mode 100644 index 000..206b074 --- /dev/null +++ b/tests/models/test_xcom.py @@ -0,0 +1,50 @@ +# Licensed to the
[airflow] 26/32: Allow to define custom XCom class (#8560)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 64c89db14308c567ac424201ed38cc452ddc6afd Author: Tomek Urbaszek AuthorDate: Tue Apr 28 16:55:05 2020 +0200 Allow to define custom XCom class (#8560) * Allow to define custom XCom class closes: #8059 (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c) --- airflow/config_templates/config.yml | 7 airflow/config_templates/default_airflow.cfg | 4 +++ airflow/models/xcom.py | 34 ++- docs/concepts.rst| 9 + tests/models/test_xcom.py| 50 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d1c2c90..f54255e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -476,6 +476,13 @@ type: string example: ~ default: "True" +- name: xcom_backend + description: | +Path to custom XCom class that will be used to store and resolve operators results + version_added: 1.10.12 + type: string + example: "path.to.CustomXCom" + default: "airflow.models.xcom.BaseXCom" - name: secrets description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf83b34..e18e538 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30 # On each dagrun check against defined SLAs check_slas = True +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + [secrets] # Full class name of secrets backend to enable (will precede env vars and metastore in search path) # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f4522b5..0b6a81d 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -class XCom(Base, LoggingMixin): +class BaseXCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin): "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise + +@staticmethod +def deserialize_value(result) -> Any: +# TODO: "pickling" has been deprecated and JSON is preferred. +# "pickling" will be removed in Airflow 2.0. +enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +return pickle.loads(result.value) + +try: +return json.loads(result.value.decode('UTF-8')) +except ValueError: +log.error("Could not deserialize the XCOM value from JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") +raise + + +def resolve_xcom_backend(): +"""Resolves custom XCom class""" +clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") +if clazz: +if not issubclass(clazz, BaseXCom): +raise TypeError( +f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`." +) +return clazz +return BaseXCom + + +XCom = resolve_xcom_backend() diff --git a/docs/concepts.rst b/docs/concepts.rst index e85c5b3..dd48003 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -660,6 +660,15 @@ of what this may look like: Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. +Custom XCom backend +''' + +It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result. +To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point +to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization / +deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` +methods. + .. _concepts:variables: Variables diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py new file mode 100644 index 000..206b074 --- /dev/null +++ b/tests/models/test_xcom.py @@ -0,0 +1,50 @@ +# Licensed to the
[airflow] 26/32: Allow to define custom XCom class (#8560)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 64c89db14308c567ac424201ed38cc452ddc6afd Author: Tomek Urbaszek AuthorDate: Tue Apr 28 16:55:05 2020 +0200 Allow to define custom XCom class (#8560) * Allow to define custom XCom class closes: #8059 (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c) --- airflow/config_templates/config.yml | 7 airflow/config_templates/default_airflow.cfg | 4 +++ airflow/models/xcom.py | 34 ++- docs/concepts.rst| 9 + tests/models/test_xcom.py| 50 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d1c2c90..f54255e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -476,6 +476,13 @@ type: string example: ~ default: "True" +- name: xcom_backend + description: | +Path to custom XCom class that will be used to store and resolve operators results + version_added: 1.10.12 + type: string + example: "path.to.CustomXCom" + default: "airflow.models.xcom.BaseXCom" - name: secrets description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf83b34..e18e538 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30 # On each dagrun check against defined SLAs check_slas = True +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + [secrets] # Full class name of secrets backend to enable (will precede env vars and metastore in search path) # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f4522b5..0b6a81d 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -class XCom(Base, LoggingMixin): +class BaseXCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin): "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise + +@staticmethod +def deserialize_value(result) -> Any: +# TODO: "pickling" has been deprecated and JSON is preferred. +# "pickling" will be removed in Airflow 2.0. +enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +return pickle.loads(result.value) + +try: +return json.loads(result.value.decode('UTF-8')) +except ValueError: +log.error("Could not deserialize the XCOM value from JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") +raise + + +def resolve_xcom_backend(): +"""Resolves custom XCom class""" +clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") +if clazz: +if not issubclass(clazz, BaseXCom): +raise TypeError( +f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`." +) +return clazz +return BaseXCom + + +XCom = resolve_xcom_backend() diff --git a/docs/concepts.rst b/docs/concepts.rst index e85c5b3..dd48003 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -660,6 +660,15 @@ of what this may look like: Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. +Custom XCom backend +''' + +It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result. +To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point +to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization / +deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` +methods. + .. _concepts:variables: Variables diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py new file mode 100644 index 000..206b074 --- /dev/null +++ b/tests/models/test_xcom.py @@ -0,0 +1,50 @@ +# Licensed to the
[airflow] 26/32: Allow to define custom XCom class (#8560)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 64c89db14308c567ac424201ed38cc452ddc6afd Author: Tomek Urbaszek AuthorDate: Tue Apr 28 16:55:05 2020 +0200 Allow to define custom XCom class (#8560) * Allow to define custom XCom class closes: #8059 (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c) --- airflow/config_templates/config.yml | 7 airflow/config_templates/default_airflow.cfg | 4 +++ airflow/models/xcom.py | 34 ++- docs/concepts.rst| 9 + tests/models/test_xcom.py| 50 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d1c2c90..f54255e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -476,6 +476,13 @@ type: string example: ~ default: "True" +- name: xcom_backend + description: | +Path to custom XCom class that will be used to store and resolve operators results + version_added: 1.10.12 + type: string + example: "path.to.CustomXCom" + default: "airflow.models.xcom.BaseXCom" - name: secrets description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf83b34..e18e538 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30 # On each dagrun check against defined SLAs check_slas = True +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + [secrets] # Full class name of secrets backend to enable (will precede env vars and metastore in search path) # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f4522b5..0b6a81d 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -class XCom(Base, LoggingMixin): +class BaseXCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin): "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise + +@staticmethod +def deserialize_value(result) -> Any: +# TODO: "pickling" has been deprecated and JSON is preferred. +# "pickling" will be removed in Airflow 2.0. +enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') +if enable_pickling: +return pickle.loads(result.value) + +try: +return json.loads(result.value.decode('UTF-8')) +except ValueError: +log.error("Could not deserialize the XCOM value from JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") +raise + + +def resolve_xcom_backend(): +"""Resolves custom XCom class""" +clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") +if clazz: +if not issubclass(clazz, BaseXCom): +raise TypeError( +f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`." +) +return clazz +return BaseXCom + + +XCom = resolve_xcom_backend() diff --git a/docs/concepts.rst b/docs/concepts.rst index e85c5b3..dd48003 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -660,6 +660,15 @@ of what this may look like: Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. +Custom XCom backend +''' + +It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result. +To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point +to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization / +deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` +methods. + .. _concepts:variables: Variables diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py new file mode 100644 index 000..206b074 --- /dev/null +++ b/tests/models/test_xcom.py @@ -0,0 +1,50 @@ +# Licensed to the