[airflow] 26/32: Allow to define custom XCom class (#8560)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64c89db14308c567ac424201ed38cc452ddc6afd
Author: Tomek Urbaszek 
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

Allow to define custom XCom class (#8560)

* Allow to define custom XCom class

closes: #8059
(cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml  |  7 
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py   | 34 ++-
 docs/concepts.rst|  9 +
 tests/models/test_xcom.py| 50 
 5 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
   type: string
   example: ~
   default: "True"
+- name: xcom_backend
+  description: |
+Path to custom XCom class that will be used to store and resolve 
operators results
+  version_added: 1.10.12
+  type: string
+  example: "path.to.CustomXCom"
+  default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators 
results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and 
metastore in search path)
 # Example: backend = 
airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..0b6a81d 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
 """
 Base class for XCom objects.
 """
@@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin):
   "for XCOM, then you need to enable pickle "
   "support for XCOM in your airflow config.")
 raise
+
+@staticmethod
+def deserialize_value(result) -> Any:
+# TODO: "pickling" has been deprecated and JSON is preferred.
+# "pickling" will be removed in Airflow 2.0.
+enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
+if enable_pickling:
+return pickle.loads(result.value)
+
+try:
+return json.loads(result.value.decode('UTF-8'))
+except ValueError:
+log.error("Could not deserialize the XCOM value from JSON. "
+  "If you are using pickles instead of JSON "
+  "for XCOM, then you need to enable pickle "
+  "support for XCOM in your airflow config.")
+raise
+
+
+def resolve_xcom_backend():
+"""Resolves custom XCom class"""
+clazz = conf.getimport("core", "xcom_backend", 
fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
+if clazz:
+if not issubclass(clazz, BaseXCom):
+raise TypeError(
+f"Your custom XCom class `{clazz.__name__}` is not a subclass 
of `{BaseXCom.__name__}`."
+)
+return clazz
+return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''
+
+It is possible to change ``XCom`` behaviour os serialization and 
deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. 
Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To 
alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` 
and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the 

[airflow] 26/32: Allow to define custom XCom class (#8560)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64c89db14308c567ac424201ed38cc452ddc6afd
Author: Tomek Urbaszek 
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

Allow to define custom XCom class (#8560)

* Allow to define custom XCom class

closes: #8059
(cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml  |  7 
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py   | 34 ++-
 docs/concepts.rst|  9 +
 tests/models/test_xcom.py| 50 
 5 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
   type: string
   example: ~
   default: "True"
+- name: xcom_backend
+  description: |
+Path to custom XCom class that will be used to store and resolve 
operators results
+  version_added: 1.10.12
+  type: string
+  example: "path.to.CustomXCom"
+  default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators 
results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and 
metastore in search path)
 # Example: backend = 
airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..0b6a81d 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
 """
 Base class for XCom objects.
 """
@@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin):
   "for XCOM, then you need to enable pickle "
   "support for XCOM in your airflow config.")
 raise
+
+@staticmethod
+def deserialize_value(result) -> Any:
+# TODO: "pickling" has been deprecated and JSON is preferred.
+# "pickling" will be removed in Airflow 2.0.
+enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
+if enable_pickling:
+return pickle.loads(result.value)
+
+try:
+return json.loads(result.value.decode('UTF-8'))
+except ValueError:
+log.error("Could not deserialize the XCOM value from JSON. "
+  "If you are using pickles instead of JSON "
+  "for XCOM, then you need to enable pickle "
+  "support for XCOM in your airflow config.")
+raise
+
+
+def resolve_xcom_backend():
+"""Resolves custom XCom class"""
+clazz = conf.getimport("core", "xcom_backend", 
fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
+if clazz:
+if not issubclass(clazz, BaseXCom):
+raise TypeError(
+f"Your custom XCom class `{clazz.__name__}` is not a subclass 
of `{BaseXCom.__name__}`."
+)
+return clazz
+return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''
+
+It is possible to change ``XCom`` behaviour os serialization and 
deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. 
Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To 
alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` 
and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the 

[airflow] 26/32: Allow to define custom XCom class (#8560)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64c89db14308c567ac424201ed38cc452ddc6afd
Author: Tomek Urbaszek 
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

Allow to define custom XCom class (#8560)

* Allow to define custom XCom class

closes: #8059
(cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml  |  7 
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py   | 34 ++-
 docs/concepts.rst|  9 +
 tests/models/test_xcom.py| 50 
 5 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
   type: string
   example: ~
   default: "True"
+- name: xcom_backend
+  description: |
+Path to custom XCom class that will be used to store and resolve 
operators results
+  version_added: 1.10.12
+  type: string
+  example: "path.to.CustomXCom"
+  default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators 
results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and 
metastore in search path)
 # Example: backend = 
airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..0b6a81d 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
 """
 Base class for XCom objects.
 """
@@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin):
   "for XCOM, then you need to enable pickle "
   "support for XCOM in your airflow config.")
 raise
+
+@staticmethod
+def deserialize_value(result) -> Any:
+# TODO: "pickling" has been deprecated and JSON is preferred.
+# "pickling" will be removed in Airflow 2.0.
+enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
+if enable_pickling:
+return pickle.loads(result.value)
+
+try:
+return json.loads(result.value.decode('UTF-8'))
+except ValueError:
+log.error("Could not deserialize the XCOM value from JSON. "
+  "If you are using pickles instead of JSON "
+  "for XCOM, then you need to enable pickle "
+  "support for XCOM in your airflow config.")
+raise
+
+
+def resolve_xcom_backend():
+"""Resolves custom XCom class"""
+clazz = conf.getimport("core", "xcom_backend", 
fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
+if clazz:
+if not issubclass(clazz, BaseXCom):
+raise TypeError(
+f"Your custom XCom class `{clazz.__name__}` is not a subclass 
of `{BaseXCom.__name__}`."
+)
+return clazz
+return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''
+
+It is possible to change ``XCom`` behaviour os serialization and 
deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. 
Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To 
alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` 
and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the 

[airflow] 26/32: Allow to define custom XCom class (#8560)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64c89db14308c567ac424201ed38cc452ddc6afd
Author: Tomek Urbaszek 
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

Allow to define custom XCom class (#8560)

* Allow to define custom XCom class

closes: #8059
(cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml  |  7 
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py   | 34 ++-
 docs/concepts.rst|  9 +
 tests/models/test_xcom.py| 50 
 5 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
   type: string
   example: ~
   default: "True"
+- name: xcom_backend
+  description: |
+Path to custom XCom class that will be used to store and resolve 
operators results
+  version_added: 1.10.12
+  type: string
+  example: "path.to.CustomXCom"
+  default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators 
results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and 
metastore in search path)
 # Example: backend = 
airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..0b6a81d 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
 """
 Base class for XCom objects.
 """
@@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin):
   "for XCOM, then you need to enable pickle "
   "support for XCOM in your airflow config.")
 raise
+
+@staticmethod
+def deserialize_value(result) -> Any:
+# TODO: "pickling" has been deprecated and JSON is preferred.
+# "pickling" will be removed in Airflow 2.0.
+enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
+if enable_pickling:
+return pickle.loads(result.value)
+
+try:
+return json.loads(result.value.decode('UTF-8'))
+except ValueError:
+log.error("Could not deserialize the XCOM value from JSON. "
+  "If you are using pickles instead of JSON "
+  "for XCOM, then you need to enable pickle "
+  "support for XCOM in your airflow config.")
+raise
+
+
+def resolve_xcom_backend():
+"""Resolves custom XCom class"""
+clazz = conf.getimport("core", "xcom_backend", 
fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
+if clazz:
+if not issubclass(clazz, BaseXCom):
+raise TypeError(
+f"Your custom XCom class `{clazz.__name__}` is not a subclass 
of `{BaseXCom.__name__}`."
+)
+return clazz
+return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''
+
+It is possible to change ``XCom`` behaviour os serialization and 
deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. 
Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To 
alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` 
and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the 

[airflow] 26/32: Allow to define custom XCom class (#8560)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64c89db14308c567ac424201ed38cc452ddc6afd
Author: Tomek Urbaszek 
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

Allow to define custom XCom class (#8560)

* Allow to define custom XCom class

closes: #8059
(cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml  |  7 
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py   | 34 ++-
 docs/concepts.rst|  9 +
 tests/models/test_xcom.py| 50 
 5 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
   type: string
   example: ~
   default: "True"
+- name: xcom_backend
+  description: |
+Path to custom XCom class that will be used to store and resolve 
operators results
+  version_added: 1.10.12
+  type: string
+  example: "path.to.CustomXCom"
+  default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators 
results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and 
metastore in search path)
 # Example: backend = 
airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..0b6a81d 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
 """
 Base class for XCom objects.
 """
@@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin):
   "for XCOM, then you need to enable pickle "
   "support for XCOM in your airflow config.")
 raise
+
+@staticmethod
+def deserialize_value(result) -> Any:
+# TODO: "pickling" has been deprecated and JSON is preferred.
+# "pickling" will be removed in Airflow 2.0.
+enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
+if enable_pickling:
+return pickle.loads(result.value)
+
+try:
+return json.loads(result.value.decode('UTF-8'))
+except ValueError:
+log.error("Could not deserialize the XCOM value from JSON. "
+  "If you are using pickles instead of JSON "
+  "for XCOM, then you need to enable pickle "
+  "support for XCOM in your airflow config.")
+raise
+
+
+def resolve_xcom_backend():
+"""Resolves custom XCom class"""
+clazz = conf.getimport("core", "xcom_backend", 
fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
+if clazz:
+if not issubclass(clazz, BaseXCom):
+raise TypeError(
+f"Your custom XCom class `{clazz.__name__}` is not a subclass 
of `{BaseXCom.__name__}`."
+)
+return clazz
+return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''
+
+It is possible to change ``XCom`` behaviour os serialization and 
deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. 
Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To 
alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` 
and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the