This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 093ab7e755 Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829) 093ab7e755 is described below commit 093ab7e7556bad9202e83e9fd6d968c50a5f7cb8 Author: Maxim Martynov <martinov_m...@mail.ru> AuthorDate: Mon Apr 8 20:17:14 2024 +0300 Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829) --- airflow/providers/openlineage/plugins/macros.py | 43 ++++++++++++---- .../providers/openlineage/plugins/openlineage.py | 9 +++- airflow/providers/openlineage/utils/utils.py | 2 +- .../macros.rst | 59 ++++++++++++++-------- tests/providers/openlineage/plugins/test_macros.py | 36 ++++++++++--- 5 files changed, 109 insertions(+), 40 deletions(-) diff --git a/airflow/providers/openlineage/plugins/macros.py b/airflow/providers/openlineage/plugins/macros.py index 391b29495f..ddfceb3459 100644 --- a/airflow/providers/openlineage/plugins/macros.py +++ b/airflow/providers/openlineage/plugins/macros.py @@ -26,22 +26,41 @@ if TYPE_CHECKING: from airflow.models import TaskInstance +def lineage_job_namespace(): + """ + Macro function which returns Airflow OpenLineage namespace. + + .. seealso:: + For more information take a look at the guide: + :ref:`howto/macros:openlineage` + """ + return conf.namespace() + + +def lineage_job_name(task_instance: TaskInstance): + """ + Macro function which returns Airflow task name in OpenLineage format (`<dag_id>.<task_id>`). + + .. seealso:: + For more information take a look at the guide: + :ref:`howto/macros:openlineage` + """ + return get_job_name(task_instance) + + def lineage_run_id(task_instance: TaskInstance): """ - Macro function which returns the generated run id for a given task. + Macro function which returns the generated run id (UUID) for a given task. This can be used to forward the run id from a task to a child run so the job hierarchy is preserved. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information take a look at the guide: :ref:`howto/macros:openlineage` """ - if TYPE_CHECKING: - assert task_instance.task - return OpenLineageAdapter.build_task_instance_run_id( dag_id=task_instance.dag_id, - task_id=task_instance.task.task_id, + task_id=task_instance.task_id, execution_date=task_instance.execution_date, try_number=task_instance.try_number, ) @@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance): run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information. .. seealso:: - For more information on how to use this macro, take a look at the guide: + For more information take a look at the guide: :ref:`howto/macros:openlineage` """ - job_name = get_job_name(task_instance.task) - run_id = lineage_run_id(task_instance) - return f"{conf.namespace()}/{job_name}/{run_id}" + return "/".join( + ( + lineage_job_namespace(), + lineage_job_name(task_instance), + lineage_run_id(task_instance), + ) + ) diff --git a/airflow/providers/openlineage/plugins/openlineage.py b/airflow/providers/openlineage/plugins/openlineage.py index a0be47a499..5927929588 100644 --- a/airflow/providers/openlineage/plugins/openlineage.py +++ b/airflow/providers/openlineage/plugins/openlineage.py @@ -19,7 +19,12 @@ from __future__ import annotations from airflow.plugins_manager import AirflowPlugin from airflow.providers.openlineage import conf from airflow.providers.openlineage.plugins.listener import get_openlineage_listener -from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id +from airflow.providers.openlineage.plugins.macros import ( + lineage_job_name, + lineage_job_namespace, + lineage_parent_id, + lineage_run_id, +) class OpenLineageProviderPlugin(AirflowPlugin): @@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin): name = "OpenLineageProviderPlugin" if not conf.is_disabled(): - macros = [lineage_run_id, lineage_parent_id] + macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id] listeners = [get_openlineage_listener()] diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index fb2263b90d..1c777aff76 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -56,7 +56,7 @@ def get_operator_class(task: BaseOperator) -> type: return task.__class__ -def get_job_name(task): +def get_job_name(task: TaskInstance) -> str: return f"{task.dag_id}.{task.task_id}" diff --git a/docs/apache-airflow-providers-openlineage/macros.rst b/docs/apache-airflow-providers-openlineage/macros.rst index 72c1e3a7a5..3ce285f966 100644 --- a/docs/apache-airflow-providers-openlineage/macros.rst +++ b/docs/apache-airflow-providers-openlineage/macros.rst @@ -24,30 +24,49 @@ Macros included in OpenLineage plugin get integrated to Airflow's main collectio They can be invoked as a Jinja template, e.g. -Lineage run id --------------- +Lineage job & run macros +------------------------ + +These macros: + * ``lineage_job_namespace()`` + * ``lineage_job_name(task_instance)`` + * ``lineage_run_id(task_instance)`` + +allow injecting pieces of run information of a given Airflow task into the arguments sent to a remote processing job. +For example, ``SparkSubmitOperator`` can be set up like this: + .. code-block:: python - PythonOperator( - task_id="render_template", - python_callable=my_task_function, - op_args=[ - "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}" - ], # lineage_run_id macro invoked - provide_context=False, - dag=dag, - ) + SparkSubmitOperator( + task_id="my_task", + application="/script.py", + conf={ + # separated components + "spark.openlineage.parentJobNamespace": "{{ macros.OpenLineagePlugin.lineage_job_namespace() }}", + "spark.openlineage.parentJobName": "{{ macros.OpenLineagePlugin.lineage_job_name(task_instance) }}", + "spark.openlineage.parentRunId": "{{ macros.OpenLineagePlugin.lineage_run_id(task_instance) }}", + }, + ) Lineage parent id ----------------- + +Same information, but compacted to one string, can be passed using ``linage_parent_id(task_instance)`` macro: + .. code-block:: python - PythonOperator( - task_id="render_template", - python_callable=my_task_function, - op_args=[ - "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}" - ], # lineage_parent_id macro invoked - provide_context=False, - dag=dag, - ) + def my_task_function(templates_dict, **kwargs): + parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/") + ... + + + PythonOperator( + task_id="render_template", + python_callable=my_task_function, + templates_dict={ + # joined components as one string `<namespace>/<name>/<run_id>` + "parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}", + }, + provide_context=False, + dag=dag, + ) diff --git a/tests/providers/openlineage/plugins/test_macros.py b/tests/providers/openlineage/plugins/test_macros.py index 9e2160aa19..a735312ab6 100644 --- a/tests/providers/openlineage/plugins/test_macros.py +++ b/tests/providers/openlineage/plugins/test_macros.py @@ -20,16 +20,38 @@ import uuid from unittest import mock from airflow.providers.openlineage.conf import namespace -from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id +from airflow.providers.openlineage.plugins.macros import ( + lineage_job_name, + lineage_job_namespace, + lineage_parent_id, + lineage_run_id, +) _DAG_NAMESPACE = namespace() +def test_lineage_job_namespace(): + assert lineage_job_namespace() == _DAG_NAMESPACE + + +def test_lineage_job_name(): + task_instance = mock.MagicMock( + dag_id="dag_id", + task_id="task_id", + execution_date="execution_date", + try_number=1, + ) + assert lineage_job_name(task_instance) == "dag_id.task_id" + + def test_lineage_run_id(): - task = mock.MagicMock( - dag_id="dag_id", execution_date="execution_date", try_number=1, task=mock.MagicMock(task_id="task_id") + task_instance = mock.MagicMock( + dag_id="dag_id", + task_id="task_id", + execution_date="execution_date", + try_number=1, ) - actual = lineage_run_id(task) + actual = lineage_run_id(task_instance) expected = str( uuid.uuid3( uuid.NAMESPACE_URL, @@ -42,12 +64,12 @@ def test_lineage_run_id(): @mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id") def test_lineage_parent_id(mock_run_id): mock_run_id.return_value = "run_id" - task = mock.MagicMock( + task_instance = mock.MagicMock( dag_id="dag_id", + task_id="task_id", execution_date="execution_date", try_number=1, - task=mock.MagicMock(task_id="task_id", dag_id="dag_id"), ) - actual = lineage_parent_id(task_instance=task) + actual = lineage_parent_id(task_instance) expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id" assert actual == expected