This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 093ab7e755 Add lineage_job_namespace and lineage_job_name OpenLineage 
macros (#38829)
093ab7e755 is described below

commit 093ab7e7556bad9202e83e9fd6d968c50a5f7cb8
Author: Maxim Martynov <martinov_m...@mail.ru>
AuthorDate: Mon Apr 8 20:17:14 2024 +0300

    Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829)
---
 airflow/providers/openlineage/plugins/macros.py    | 43 ++++++++++++----
 .../providers/openlineage/plugins/openlineage.py   |  9 +++-
 airflow/providers/openlineage/utils/utils.py       |  2 +-
 .../macros.rst                                     | 59 ++++++++++++++--------
 tests/providers/openlineage/plugins/test_macros.py | 36 ++++++++++---
 5 files changed, 109 insertions(+), 40 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/macros.py 
b/airflow/providers/openlineage/plugins/macros.py
index 391b29495f..ddfceb3459 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -26,22 +26,41 @@ if TYPE_CHECKING:
     from airflow.models import TaskInstance
 
 
+def lineage_job_namespace():
+    """
+    Macro function which returns Airflow OpenLineage namespace.
+
+    .. seealso::
+        For more information take a look at the guide:
+        :ref:`howto/macros:openlineage`
+    """
+    return conf.namespace()
+
+
+def lineage_job_name(task_instance: TaskInstance):
+    """
+    Macro function which returns Airflow task name in OpenLineage format 
(`<dag_id>.<task_id>`).
+
+    .. seealso::
+        For more information take a look at the guide:
+        :ref:`howto/macros:openlineage`
+    """
+    return get_job_name(task_instance)
+
+
 def lineage_run_id(task_instance: TaskInstance):
     """
-    Macro function which returns the generated run id for a given task.
+    Macro function which returns the generated run id (UUID) for a given task.
 
     This can be used to forward the run id from a task to a child run so the 
job hierarchy is preserved.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information take a look at the guide:
         :ref:`howto/macros:openlineage`
     """
-    if TYPE_CHECKING:
-        assert task_instance.task
-
     return OpenLineageAdapter.build_task_instance_run_id(
         dag_id=task_instance.dag_id,
-        task_id=task_instance.task.task_id,
+        task_id=task_instance.task_id,
         execution_date=task_instance.execution_date,
         try_number=task_instance.try_number,
     )
@@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance):
     run so the job hierarchy is preserved. Child run can easily create 
ParentRunFacet from these information.
 
     .. seealso::
-        For more information on how to use this macro, take a look at the 
guide:
+        For more information take a look at the guide:
         :ref:`howto/macros:openlineage`
     """
-    job_name = get_job_name(task_instance.task)
-    run_id = lineage_run_id(task_instance)
-    return f"{conf.namespace()}/{job_name}/{run_id}"
+    return "/".join(
+        (
+            lineage_job_namespace(),
+            lineage_job_name(task_instance),
+            lineage_run_id(task_instance),
+        )
+    )
diff --git a/airflow/providers/openlineage/plugins/openlineage.py 
b/airflow/providers/openlineage/plugins/openlineage.py
index a0be47a499..5927929588 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -19,7 +19,12 @@ from __future__ import annotations
 from airflow.plugins_manager import AirflowPlugin
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.listener import 
get_openlineage_listener
-from airflow.providers.openlineage.plugins.macros import lineage_parent_id, 
lineage_run_id
+from airflow.providers.openlineage.plugins.macros import (
+    lineage_job_name,
+    lineage_job_namespace,
+    lineage_parent_id,
+    lineage_run_id,
+)
 
 
 class OpenLineageProviderPlugin(AirflowPlugin):
@@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin):
 
     name = "OpenLineageProviderPlugin"
     if not conf.is_disabled():
-        macros = [lineage_run_id, lineage_parent_id]
+        macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, 
lineage_parent_id]
         listeners = [get_openlineage_listener()]
diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index fb2263b90d..1c777aff76 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -56,7 +56,7 @@ def get_operator_class(task: BaseOperator) -> type:
     return task.__class__
 
 
-def get_job_name(task):
+def get_job_name(task: TaskInstance) -> str:
     return f"{task.dag_id}.{task.task_id}"
 
 
diff --git a/docs/apache-airflow-providers-openlineage/macros.rst 
b/docs/apache-airflow-providers-openlineage/macros.rst
index 72c1e3a7a5..3ce285f966 100644
--- a/docs/apache-airflow-providers-openlineage/macros.rst
+++ b/docs/apache-airflow-providers-openlineage/macros.rst
@@ -24,30 +24,49 @@ Macros included in OpenLineage plugin get integrated to 
Airflow's main collectio
 
 They can be invoked as a Jinja template, e.g.
 
-Lineage run id
---------------
+Lineage job & run macros
+------------------------
+
+These macros:
+  * ``lineage_job_namespace()``
+  * ``lineage_job_name(task_instance)``
+  * ``lineage_run_id(task_instance)``
+
+allow injecting pieces of run information of a given Airflow task into the 
arguments sent to a remote processing job.
+For example, ``SparkSubmitOperator`` can be set up like this:
+
 .. code-block:: python
 
-        PythonOperator(
-            task_id="render_template",
-            python_callable=my_task_function,
-            op_args=[
-                "{{ 
macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}"
-            ],  # lineage_run_id macro invoked
-            provide_context=False,
-            dag=dag,
-        )
+    SparkSubmitOperator(
+        task_id="my_task",
+        application="/script.py",
+        conf={
+            # separated components
+            "spark.openlineage.parentJobNamespace": "{{ 
macros.OpenLineagePlugin.lineage_job_namespace() }}",
+            "spark.openlineage.parentJobName": "{{ 
macros.OpenLineagePlugin.lineage_job_name(task_instance) }}",
+            "spark.openlineage.parentRunId": "{{ 
macros.OpenLineagePlugin.lineage_run_id(task_instance) }}",
+        },
+    )
 
 Lineage parent id
 -----------------
+
+Same information, but compacted to one string, can be passed using 
``linage_parent_id(task_instance)`` macro:
+
 .. code-block:: python
 
-        PythonOperator(
-            task_id="render_template",
-            python_callable=my_task_function,
-            op_args=[
-                "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, 
task_instance) }}"
-            ],  # lineage_parent_id macro invoked
-            provide_context=False,
-            dag=dag,
-        )
+    def my_task_function(templates_dict, **kwargs):
+        parent_job_namespace, parent_job_name, parent_run_id = 
templates_dict["parentRun"].split("/")
+        ...
+
+
+    PythonOperator(
+        task_id="render_template",
+        python_callable=my_task_function,
+        templates_dict={
+            # joined components as one string `<namespace>/<name>/<run_id>`
+            "parentRun": "{{ 
macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
+        },
+        provide_context=False,
+        dag=dag,
+    )
diff --git a/tests/providers/openlineage/plugins/test_macros.py 
b/tests/providers/openlineage/plugins/test_macros.py
index 9e2160aa19..a735312ab6 100644
--- a/tests/providers/openlineage/plugins/test_macros.py
+++ b/tests/providers/openlineage/plugins/test_macros.py
@@ -20,16 +20,38 @@ import uuid
 from unittest import mock
 
 from airflow.providers.openlineage.conf import namespace
-from airflow.providers.openlineage.plugins.macros import lineage_parent_id, 
lineage_run_id
+from airflow.providers.openlineage.plugins.macros import (
+    lineage_job_name,
+    lineage_job_namespace,
+    lineage_parent_id,
+    lineage_run_id,
+)
 
 _DAG_NAMESPACE = namespace()
 
 
+def test_lineage_job_namespace():
+    assert lineage_job_namespace() == _DAG_NAMESPACE
+
+
+def test_lineage_job_name():
+    task_instance = mock.MagicMock(
+        dag_id="dag_id",
+        task_id="task_id",
+        execution_date="execution_date",
+        try_number=1,
+    )
+    assert lineage_job_name(task_instance) == "dag_id.task_id"
+
+
 def test_lineage_run_id():
-    task = mock.MagicMock(
-        dag_id="dag_id", execution_date="execution_date", try_number=1, 
task=mock.MagicMock(task_id="task_id")
+    task_instance = mock.MagicMock(
+        dag_id="dag_id",
+        task_id="task_id",
+        execution_date="execution_date",
+        try_number=1,
     )
-    actual = lineage_run_id(task)
+    actual = lineage_run_id(task_instance)
     expected = str(
         uuid.uuid3(
             uuid.NAMESPACE_URL,
@@ -42,12 +64,12 @@ def test_lineage_run_id():
 @mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
 def test_lineage_parent_id(mock_run_id):
     mock_run_id.return_value = "run_id"
-    task = mock.MagicMock(
+    task_instance = mock.MagicMock(
         dag_id="dag_id",
+        task_id="task_id",
         execution_date="execution_date",
         try_number=1,
-        task=mock.MagicMock(task_id="task_id", dag_id="dag_id"),
     )
-    actual = lineage_parent_id(task_instance=task)
+    actual = lineage_parent_id(task_instance)
     expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"
     assert actual == expected

Reply via email to