(airflow) branch openlineage-process-execution updated (8cbb8bcf73 -> 187d87e5f1)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 8cbb8bcf73 openlineage: execute extraction and message sending in separate process add 205ad57d04 doc: metrics allow_list complet example (#40120) add 2272ea246a AIP-64: Add UI endpoint for task instance history (#40221) add 67798b2b8a Fix Scheduler restarting due to too many completed pods in cluster (#40183) add d5a75446a6 Fix import future annotations in venv jinja template (#40208) add 6f4098487d openlineage, redshift: do not call DB for schemas below Airflow 2.10 (#40197) add 187d87e5f1 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (8cbb8bcf73) \ N -- N -- N refs/heads/openlineage-process-execution (187d87e5f1) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: airflow/config_templates/config.yml| 6 +- airflow/providers/amazon/aws/hooks/redshift_sql.py | 9 +- .../executors/kubernetes_executor_utils.py | 15 +- airflow/providers/openlineage/utils/utils.py | 9 +- airflow/utils/python_virtualenv_script.jinja2 | 1 + airflow/www/views.py | 35 .../logging-monitoring/metrics.rst | 4 +- tests/decorators/test_python_virtualenv.py | 30 +++ .../amazon/aws/operators/test_redshift_sql.py | 220 - .../executors/test_kubernetes_executor.py | 24 ++- .../views/test_views_ti_history.py}| 36 ++-- 11 files changed, 250 insertions(+), 139 deletions(-) copy tests/{api_connexion/endpoints/test_version_endpoint.py => www/views/test_views_ti_history.py} (52%)
(airflow) branch main updated (d5a75446a6 -> 6f4098487d)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from d5a75446a6 Fix import future annotations in venv jinja template (#40208) add 6f4098487d openlineage, redshift: do not call DB for schemas below Airflow 2.10 (#40197) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/hooks/redshift_sql.py | 9 +- airflow/providers/openlineage/utils/utils.py | 9 +- .../amazon/aws/operators/test_redshift_sql.py | 220 - 3 files changed, 140 insertions(+), 98 deletions(-)
(airflow) branch openlineage-process-execution updated (55e579286a -> 8cbb8bcf73)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 55e579286a openlineage: execute extraction and message sending in separate process omit 7b0404b883 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 14deaa2f1f Add CSRF protection to "/logout" (#40145) add c98cd54d0a Support checking for db path absoluteness on Windows (#40069) add 1372e100d9 add Coinone (#40176) add 794678f49d Resolve deprecations in `SubDagOperator` (#40179) add 930db714f7 Resolve deprecations in `BigQuery` operators (#40182) add f0b51cdacc openlineage: add some debug logging around sql parser call sites (#40200) add c1ffe45e06 Resolve deprecations in `OracleHook` (#40199) add 28c1419aac Fix TS linting issues caused by #40145 (#40202) add a84d56d077 Update jest and babel minor versions (#40203) add c2a93eabd1 Update AWS Executor documentation (#39920) add 835f28c8b9 Lazy match escaped quotes in `RedshiftToS3Operator` (#40206) add 2149b4dbee Ensures DAG params order regardless of backend (#40156) add fa65a20d4a local task job: add timeout, to not kill on_task_instance_success listener prematurely (#39890) add feb8307472 Resolve deprecations in `LatestOnlyOperator` tests (#40181) add 8cbb8bcf73 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (55e579286a) \ N -- N -- N refs/heads/openlineage-process-execution (8cbb8bcf73) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .pre-commit-config.yaml|2 +- INTHEWILD.md |1 + .../amazon/aws/transfers/redshift_to_s3.py |2 +- .../fab/auth_manager/security_manager/override.py | 17 + airflow/providers/openlineage/sqlparser.py | 16 +- airflow/providers/openlineage/utils/sql.py |6 + airflow/providers/snowflake/hooks/snowflake.py |6 +- airflow/serialization/schema.json | 14 +- airflow/serialization/serialized_objects.py| 18 +- airflow/settings.py| 27 +- airflow/www/app.py |3 +- airflow/www/package.json | 18 +- airflow/www/static/css/bootstrap-theme.css | 17 +- airflow/www/static/css/material-icons.css |3 +- airflow/www/templates/appbuilder/navbar_right.html |7 +- airflow/www/yarn.lock | 1781 +++- .../executors/batch-executor.rst |2 +- .../executors/ecs-executor.rst | 284 +--- .../executors/general.rst | 11 + newsfragments/40145.significant.rst|5 + tests/core/test_settings.py| 18 +- tests/deprecations_ignore.yml | 40 - tests/models/test_serialized_dag.py| 16 + tests/operators/test_latest_only_operator.py |3 + tests/operators/test_subdag_operator.py| 45 +- .../amazon/aws/transfers/test_redshift_to_s3.py|6 + .../test_role_and_permission_endpoint.py |2 + .../google/cloud/operators/test_bigquery.py| 379 +++-- tests/providers/oracle/hooks/test_oracle.py|6 +- tests/serialization/test_dag_serialization.py | 42 +- tests/www/views/test_session.py|4 +- 31 files changed, 1414 insertions(+), 1387 deletions(-) create mode 100644 newsfragments/40145.significant.rst
(airflow) branch main updated (2149b4dbee -> fa65a20d4a)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 2149b4dbee Ensures DAG params order regardless of backend (#40156) add fa65a20d4a local task job: add timeout, to not kill on_task_instance_success listener prematurely (#39890) No new revisions were added by this update. Summary of changes: airflow/config_templates/config.yml| 8 ++ airflow/jobs/local_task_job_runner.py | 9 +- airflow/providers/openlineage/plugins/listener.py | 1 - tests/dags/test_mark_state.py | 17 +++ tests/jobs/test_local_task_job.py | 117 - .../{throwing_listener.py => slow_listener.py} | 9 +- ...{throwing_listener.py => very_slow_listener.py} | 9 +- 7 files changed, 155 insertions(+), 15 deletions(-) copy tests/listeners/{throwing_listener.py => slow_listener.py} (94%) copy tests/listeners/{throwing_listener.py => very_slow_listener.py} (94%)
(airflow) branch openlineage-process-execution updated (d863e62738 -> 55e579286a)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git omit d863e62738 openlineage: execute extraction and message sending in separate process omit 0c51df7c20 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 7b0404b883 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 55e579286a openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (d863e62738) \ N -- N -- N refs/heads/openlineage-process-execution (55e579286a) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: tests/jobs/test_local_task_job.py | 6 +++--- tests/listeners/very_slow_listener.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-)
(airflow) branch listener-task-timeout updated (3d4661dac3 -> 7b0404b883)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git omit 3d4661dac3 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 1a7a67f134 Resolve aws provider deprecations in tests (#40123) add 74a1931c3b Split Celery integration tests to "celery" and "redis" (#39994) add 587a5744e8 Add task documentation to details tab in grid view. (#39899) add d504bfa5d1 Aip-61: Add validation on task `executor` field (#40030) add b93b94397a added Tiqets to the list of companies using airflow (#40096) add 279e8b7772 Chart: Allow valueFrom in env config of components (#40135) add 5c5a495c2c Add encryption_configuration parameter to BigQuery operators (#40063) add 8e7b48984b Bump google-ads version to use v17 by default (#40158) add e9e3c937f3 Fix minor typo in dags.rst (#40169) add d509abfa21 Introduce StartTriggerArgs and prevent start trigger initialization in scheduler (#39585) add 7c4ea23b4d Upgrade build installers and dependencies (#40177) add 7c3dc5d544 Remove unnecessary nginx redirect rule from reverse proxy documentation (#38953) add ccf12026b4 [AIP-49] OpenTelemetry Traces for Apache Airflow (#37948) add 1533ba6bf3 Bump braces from 3.0.2 to 3.0.3 in /airflow/www (#40180) add 52f858c362 Add link to Google Providers Package System Tests Public Dashboard (#40102) add 6c7aa4b2f6 feature: callable for template_fields (#37028) add e7d036a9ce Fix cloud run system test (#40152) add a586ea8b3a Add support for external IdP OIDC token retrieval for Google Cloud Operators. (#39873) add 89b32e6363 AIP-64: Add TaskInstance history table (#39951) add a90c07e9e4 Much smaller CI output for paralell tests (#40192) add 23a0152de2 Bump minimum version of google auth (#40190) add 7b0404b883 local task job: add timeout, to not kill on_task_instance_success listener prematurely This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (3d4661dac3) \ N -- N -- N refs/heads/listener-task-timeout (7b0404b883) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: Dockerfile |3 +- Dockerfile.ci |6 +- INTHEWILD.md |1 + airflow/api_connexion/openapi/v1.yaml |8 + airflow/api_connexion/schemas/task_schema.py |1 + airflow/config_templates/config.yml| 58 + airflow/decorators/base.py |4 +- airflow/executors/executor_loader.py |4 +- .../0146_2_10_0_add_task_instance_history.py | 105 + airflow/models/__init__.py |2 + airflow/models/abstractoperator.py | 20 +- airflow/models/baseoperator.py | 10 +- airflow/models/dag.py | 14 + airflow/models/dagrun.py | 16 +- airflow/models/mappedoperator.py | 17 +- airflow/models/taskinstance.py | 52 +- airflow/models/taskinstancehistory.py | 131 + airflow/providers/google/ads/hooks/ads.py |8 +- .../providers/google/cloud/operators/bigquery.py | 86 +- .../google/cloud/utils/credentials_provider.py | 68 + .../google/cloud/utils/external_token_supplier.py | 175 + .../providers/google/common/hooks/base_google.py | 30 + airflow/providers/google/provider.yaml |4 +- airflow/serialization/serialized_objects.py| 31 +- .../airflow_breeze => airflow/traces}/__init__.py |4 +- airflow/traces/otel_tracer.py | 302 ++ airflow/traces/tracer.py | 280 ++ airflow/traces/utils.py| 112 + airflow/triggers/base.py | 20 + airflow/utils/dates.py |5 + airflow/utils/db.py|2 +- airflow/utils/db_cleanup.py|1 + airflow/www/static/js/api/index.ts |2 + .../static/js/api/{useDag.ts => useTaskDetai
(airflow) branch main updated: openlineage: add some debug logging around sql parser call sites (#40200)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new f0b51cdacc openlineage: add some debug logging around sql parser call sites (#40200) f0b51cdacc is described below commit f0b51cdacc6155e4e4495a88109a01decab9e201 Author: Maciej Obuchowski AuthorDate: Wed Jun 12 16:50:59 2024 +0200 openlineage: add some debug logging around sql parser call sites (#40200) Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/sqlparser.py | 16 airflow/providers/openlineage/utils/sql.py | 6 ++ airflow/providers/snowflake/hooks/snowflake.py | 6 +++--- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index f181ff8cce..470b93d3cb 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -39,6 +39,7 @@ from airflow.providers.openlineage.utils.sql import ( get_table_schemas, ) from airflow.typing_compat import TypedDict +from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -116,7 +117,7 @@ def from_table_meta( return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) -class SQLParser: +class SQLParser(LoggingMixin): """Interface for openlineage-sql. :param dialect: dialect specific to the database @@ -124,11 +125,18 @@ class SQLParser: """ def __init__(self, dialect: str | None = None, default_schema: str | None = None) -> None: +super().__init__() self.dialect = dialect self.default_schema = default_schema def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" +self.log.debug( +"OpenLineage calling SQL parser with SQL %s dialect %s schema %s", +sql, +self.dialect, +self.default_schema, +) return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( @@ -151,6 +159,7 @@ class SQLParser: "database": database or database_info.database, "use_flat_cross_db_query": database_info.use_flat_cross_db_query, } +self.log.info("PRE getting schemas for input and output tables") return get_table_schemas( hook, namespace, @@ -335,9 +344,8 @@ class SQLParser: return split_statement(sql) return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if obj != ""] -@classmethod def create_information_schema_query( -cls, +self, tables: list[DbTableMeta], normalize_name: Callable[[str], str], is_cross_db: bool, @@ -349,7 +357,7 @@ class SQLParser: sqlalchemy_engine: Engine | None = None, ) -> str: """Create SELECT statement to query information schema table.""" -tables_hierarchy = cls._get_tables_hierarchy( +tables_hierarchy = self._get_tables_hierarchy( tables, normalize_name=normalize_name, database=database, diff --git a/airflow/providers/openlineage/utils/sql.py b/airflow/providers/openlineage/utils/sql.py index f959745b93..f5d083b4e4 100644 --- a/airflow/providers/openlineage/utils/sql.py +++ b/airflow/providers/openlineage/utils/sql.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import logging from collections import defaultdict from contextlib import closing from enum import IntEnum @@ -33,6 +34,9 @@ if TYPE_CHECKING: from airflow.hooks.base import BaseHook +log = logging.getLogger(__name__) + + class ColumnIndex(IntEnum): """Enumerates the indices of columns in information schema view.""" @@ -90,6 +94,7 @@ def get_table_schemas( if not in_query and not out_query: return [], [] +log.debug("Starting to query database for table schemas") with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor: if in_query: cursor.execute(in_query) @@ -101,6 +106,7 @@ def get_table_schemas( out_datasets = [x.to_dataset(namespace, database, schema) for x in parse_query_result(cursor)] else: out_datasets = [] +log.debug("Got table schema query result from database.") return in_datasets, out_datasets diff --git a/airflow/providers/snowflake/hooks/snowflake.py b/airflow/providers/snowflake/hooks/snowfla
(airflow) branch ol-improve-sqlparser-logging created (now 8988a3682e)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch ol-improve-sqlparser-logging in repository https://gitbox.apache.org/repos/asf/airflow.git at 8988a3682e openlineage: add some debug logging around sql parser call sites This branch includes the following new commits: new 8988a3682e openlineage: add some debug logging around sql parser call sites The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) 01/01: openlineage: add some debug logging around sql parser call sites
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch ol-improve-sqlparser-logging in repository https://gitbox.apache.org/repos/asf/airflow.git commit 8988a3682ed731552bdd29f5eefedff0fa75f02c Author: Maciej Obuchowski AuthorDate: Wed Jun 12 16:29:13 2024 +0200 openlineage: add some debug logging around sql parser call sites Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/sqlparser.py | 16 airflow/providers/openlineage/utils/sql.py | 6 ++ airflow/providers/snowflake/hooks/snowflake.py | 6 +++--- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index f181ff8cce..470b93d3cb 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -39,6 +39,7 @@ from airflow.providers.openlineage.utils.sql import ( get_table_schemas, ) from airflow.typing_compat import TypedDict +from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -116,7 +117,7 @@ def from_table_meta( return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) -class SQLParser: +class SQLParser(LoggingMixin): """Interface for openlineage-sql. :param dialect: dialect specific to the database @@ -124,11 +125,18 @@ class SQLParser: """ def __init__(self, dialect: str | None = None, default_schema: str | None = None) -> None: +super().__init__() self.dialect = dialect self.default_schema = default_schema def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" +self.log.debug( +"OpenLineage calling SQL parser with SQL %s dialect %s schema %s", +sql, +self.dialect, +self.default_schema, +) return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( @@ -151,6 +159,7 @@ class SQLParser: "database": database or database_info.database, "use_flat_cross_db_query": database_info.use_flat_cross_db_query, } +self.log.info("PRE getting schemas for input and output tables") return get_table_schemas( hook, namespace, @@ -335,9 +344,8 @@ class SQLParser: return split_statement(sql) return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if obj != ""] -@classmethod def create_information_schema_query( -cls, +self, tables: list[DbTableMeta], normalize_name: Callable[[str], str], is_cross_db: bool, @@ -349,7 +357,7 @@ class SQLParser: sqlalchemy_engine: Engine | None = None, ) -> str: """Create SELECT statement to query information schema table.""" -tables_hierarchy = cls._get_tables_hierarchy( +tables_hierarchy = self._get_tables_hierarchy( tables, normalize_name=normalize_name, database=database, diff --git a/airflow/providers/openlineage/utils/sql.py b/airflow/providers/openlineage/utils/sql.py index f959745b93..f5d083b4e4 100644 --- a/airflow/providers/openlineage/utils/sql.py +++ b/airflow/providers/openlineage/utils/sql.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import logging from collections import defaultdict from contextlib import closing from enum import IntEnum @@ -33,6 +34,9 @@ if TYPE_CHECKING: from airflow.hooks.base import BaseHook +log = logging.getLogger(__name__) + + class ColumnIndex(IntEnum): """Enumerates the indices of columns in information schema view.""" @@ -90,6 +94,7 @@ def get_table_schemas( if not in_query and not out_query: return [], [] +log.debug("Starting to query database for table schemas") with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor: if in_query: cursor.execute(in_query) @@ -101,6 +106,7 @@ def get_table_schemas( out_datasets = [x.to_dataset(namespace, database, schema) for x in parse_query_result(cursor)] else: out_datasets = [] +log.debug("Got table schema query result from database.") return in_datasets, out_datasets diff --git a/airflow/providers/snowflake/hooks/snowflake.py b/airflow/providers/snowflake/hooks/snowflake.py index 978bcf75e1..e2a4a453fb 100644 --- a/airflow/providers/snowflake/hooks/snowflake.py +++ b/airflow/providers/snowflake/hooks/snowflake.py @@ -473,10 +473,10 @@ cla
(airflow) branch openlineage-process-execution updated (7c13f5d4ea -> d863e62738)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 7c13f5d4ea openlineage: execute extraction and message sending in separate process omit 3d4661dac3 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 1a7a67f134 Resolve aws provider deprecations in tests (#40123) add 74a1931c3b Split Celery integration tests to "celery" and "redis" (#39994) add 587a5744e8 Add task documentation to details tab in grid view. (#39899) add d504bfa5d1 Aip-61: Add validation on task `executor` field (#40030) add b93b94397a added Tiqets to the list of companies using airflow (#40096) add 279e8b7772 Chart: Allow valueFrom in env config of components (#40135) add 5c5a495c2c Add encryption_configuration parameter to BigQuery operators (#40063) add 8e7b48984b Bump google-ads version to use v17 by default (#40158) add e9e3c937f3 Fix minor typo in dags.rst (#40169) add d509abfa21 Introduce StartTriggerArgs and prevent start trigger initialization in scheduler (#39585) add 7c4ea23b4d Upgrade build installers and dependencies (#40177) add 7c3dc5d544 Remove unnecessary nginx redirect rule from reverse proxy documentation (#38953) add ccf12026b4 [AIP-49] OpenTelemetry Traces for Apache Airflow (#37948) add 1533ba6bf3 Bump braces from 3.0.2 to 3.0.3 in /airflow/www (#40180) add 52f858c362 Add link to Google Providers Package System Tests Public Dashboard (#40102) add 6c7aa4b2f6 feature: callable for template_fields (#37028) add e7d036a9ce Fix cloud run system test (#40152) add a586ea8b3a Add support for external IdP OIDC token retrieval for Google Cloud Operators. (#39873) add 89b32e6363 AIP-64: Add TaskInstance history table (#39951) add a90c07e9e4 Much smaller CI output for paralell tests (#40192) add 23a0152de2 Bump minimum version of google auth (#40190) add 0c51df7c20 local task job: add timeout, to not kill on_task_instance_success listener prematurely add d863e62738 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (7c13f5d4ea) \ N -- N -- N refs/heads/openlineage-process-execution (d863e62738) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: Dockerfile |3 +- Dockerfile.ci |6 +- INTHEWILD.md |1 + airflow/api_connexion/openapi/v1.yaml |8 + airflow/api_connexion/schemas/task_schema.py |1 + airflow/config_templates/config.yml| 58 + airflow/decorators/base.py |4 +- airflow/executors/executor_loader.py |4 +- .../0146_2_10_0_add_task_instance_history.py | 105 + airflow/models/__init__.py |2 + airflow/models/abstractoperator.py | 20 +- airflow/models/baseoperator.py | 10 +- airflow/models/dag.py | 14 + airflow/models/dagrun.py | 16 +- airflow/models/mappedoperator.py | 17 +- airflow/models/taskinstance.py | 52 +- airflow/models/taskinstancehistory.py | 131 + airflow/providers/google/ads/hooks/ads.py |8 +- .../providers/google/cloud/operators/bigquery.py | 86 +- .../google/cloud/utils/credentials_provider.py | 68 + .../google/cloud/utils/external_token_supplier.py | 175 + .../providers/google/common/hooks/base_google.py | 30 + airflow/providers/google/provider.yaml |4 +- airflow/providers/openlineage/plugins/listener.py | 17 +- airflow/providers/openlineage/sqlparser.py | 16 +- airflow/providers/openlineage/utils/sql.py |6 - airflow/providers/snowflake/hooks/snowflake.py |6 +- airflow/serialization/serialized_objects.py| 31 +- .../airflow_breeze => airflow/traces}/__init__.py |4 +- airflow/traces/otel_tracer.py | 302 ++ airflow/traces/tracer.py
(airflow) branch redshift-remove-calls created (now 7e9622b6c7)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch redshift-remove-calls in repository https://gitbox.apache.org/repos/asf/airflow.git at 7e9622b6c7 openlineage, redshift: do not call DB for schemas below Airflow 2.10 This branch includes the following new commits: new 7e9622b6c7 openlineage, redshift: do not call DB for schemas below Airflow 2.10 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) 01/01: openlineage, redshift: do not call DB for schemas below Airflow 2.10
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch redshift-remove-calls in repository https://gitbox.apache.org/repos/asf/airflow.git commit 7e9622b6c79e81295ad7ad5c18974e4a8aaf2977 Author: Maciej Obuchowski AuthorDate: Wed Jun 12 15:29:46 2024 +0200 openlineage, redshift: do not call DB for schemas below Airflow 2.10 Signed-off-by: Maciej Obuchowski --- airflow/providers/amazon/aws/hooks/redshift_sql.py | 9 +- airflow/providers/openlineage/utils/utils.py | 9 +- .../amazon/aws/operators/test_redshift_sql.py | 220 - 3 files changed, 140 insertions(+), 98 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/redshift_sql.py b/airflow/providers/amazon/aws/hooks/redshift_sql.py index 33450d61ca..8df15a8eea 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_sql.py +++ b/airflow/providers/amazon/aws/hooks/redshift_sql.py @@ -20,14 +20,19 @@ from functools import cached_property from typing import TYPE_CHECKING import redshift_connector +from packaging.version import Version from redshift_connector import Connection as RedshiftConnection from sqlalchemy import create_engine from sqlalchemy.engine.url import URL +from airflow import __version__ as AIRFLOW_VERSION from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.common.sql.hooks.sql import DbApiHook +_IS_AIRFLOW_2_10_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.10.0") + + if TYPE_CHECKING: from airflow.models.connection import Connection from airflow.providers.openlineage.sqlparser import DatabaseInfo @@ -257,4 +262,6 @@ class RedshiftSQLHook(DbApiHook): def get_openlineage_default_schema(self) -> str | None: """Return current schema. This is usually changed with ``SEARCH_PATH`` parameter.""" -return self.get_first("SELECT CURRENT_SCHEMA();")[0] +if _IS_AIRFLOW_2_10_OR_HIGHER: +return self.get_first("SELECT CURRENT_SCHEMA();")[0] +return super().get_openlineage_default_schema() diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index a56bf58884..ed217bdc7a 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -29,7 +29,9 @@ from typing import TYPE_CHECKING, Any, Iterable import attrs from deprecated import deprecated from openlineage.client.utils import RedactMixin +from packaging.version import Version +from airflow import __version__ as AIRFLOW_VERSION from airflow.exceptions import AirflowProviderDeprecationWarning # TODO: move this maybe to Airflow's logic? from airflow.models import DAG, BaseOperator, MappedOperator from airflow.providers.openlineage import conf @@ -57,6 +59,7 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) _NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" +_IS_AIRFLOW_2_10_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.10.0") def try_import_from_string(string: str) -> Any: @@ -558,5 +561,7 @@ def normalize_sql(sql: str | Iterable[str]): def should_use_external_connection(hook) -> bool: -# TODO: Add checking overrides -return hook.__class__.__name__ not in ["SnowflakeHook", "SnowflakeSqlApiHook"] +# If we're at Airflow 2.10, the execution is process-isolated, so we can safely run those again. +if not _IS_AIRFLOW_2_10_OR_HIGHER: +return hook.__class__.__name__ not in ["SnowflakeHook", "SnowflakeSqlApiHook", "RedshiftSQLHook"] +return True diff --git a/tests/providers/amazon/aws/operators/test_redshift_sql.py b/tests/providers/amazon/aws/operators/test_redshift_sql.py index 586172c3b8..003c40e615 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_sql.py +++ b/tests/providers/amazon/aws/operators/test_redshift_sql.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, PropertyMock, call, patch import pytest from openlineage.client.facet import ( @@ -31,7 +31,7 @@ from openlineage.client.facet import ( from openlineage.client.run import Dataset from airflow.models.connection import Connection -from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook +from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook as OriginalRedshiftSQLHook from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator MOCK_REGION_NAME = "eu-north-1" @@ -40,38 +40,64 @@ MOCK_REGION_NAME = "eu-north-1" class TestRedshiftSQLOpenLineage: @patch.dict("os.environ", AIRFLOW_CONN_
(airflow) branch openlineage-process-execution updated (69865d61ef -> 7c13f5d4ea)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git omit 69865d61ef openlineage: execute extraction and message sending in separate process add 7c13f5d4ea openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (69865d61ef) \ N -- N -- N refs/heads/openlineage-process-execution (7c13f5d4ea) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(airflow) branch openlineage-process-execution updated (0e2172dfa6 -> 69865d61ef)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git omit 0e2172dfa6 openlineage: execute extraction and message sending in separate process add 69865d61ef openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (0e2172dfa6) \ N -- N -- N refs/heads/openlineage-process-execution (69865d61ef) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: tests/providers/openlineage/plugins/test_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(airflow) branch openlineage-process-execution updated (44ba855e1e -> 0e2172dfa6)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 44ba855e1e openlineage: execute extraction and message sending in separate process discard f0128ced4c local task job: add timeout, to not kill on_task_instance_success listener prematurely add f5c8059553 sftp_sensor: fixing resource management with sensor (#40022) add 86e613029b Implement CloudComposerDAGRunSensor (#40088) add b49db97b8e Fix param order in PythonOperator docstring (#40122) add 44a5300594 Chart: enable mysql keda support for triggerer (#37365) add 65dbf86f72 Fix openai 1.32 breaking openai tests (#40110) add 85be1867a6 Add warning to deprecated api docs that access control isn't applied (#40129) add 1a61eb3afd feat: OpenSearchQueryOperator using an endpoint with a self-signed certificate (#39788) add 27579cb773 Upgrade to codecov@v4 (#40128) add 22bd188439 Fix `importlib_metadata` import in aws utils (#40134) add 62444d8cff Resolve openlineage provider deprecations in tests (#40133) add 663d422588 Add Codecov token (#40136) add 3c8cd1cec2 Properly pass codecov-token to composite action (#40138) add e01877a162 Remove warnings when uploading code coverage (#40139) add 7a254ff240 Add rpc server to helm chart (#38549) add 15178b6953 Use stdlib `importlib.metadata` for retrieve `botocore` package version (#40137) add 0478e24957 Ignore exceptions when closing all sessions in tests (#40143) add 48b619078d Mark two tests as (trigger and deps) as flaky (#40144) add 340d6b0dde Fix highlight of example code in dags.rst (#40114) add cbe6c2dd24 Add `delete_topic` to `KafkaAdminClientHook` and teardown logic to Kafka integration tests (#40142) add 297ad80c7d Move Post Execution Log Grouping behind Exception Print (#40146) add 14a613fc7d Allow executors to be specified with only the class name of the Executor (#40131) add fc4fbb3dcf Update providers metadata 2024-06-09 (#40149) add f7708acab9 Fix hive_partition_sensor system test (#40023) add 072c63b539 Fix typo in providers_manager.py (#40157) add aabe66f869 Chart: Default airflow version to 2.9.2 (#40160) add e9d8222a19 Airflow 2.9.2 has been released (#40159) add 3d4661dac3 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 0e2172dfa6 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (44ba855e1e) \ N -- N -- N refs/heads/openlineage-process-execution (0e2172dfa6) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .github/ISSUE_TEMPLATE/airflow_bug_report.yml | 2 +- .github/actions/post_tests_success/action.yml | 21 +- .github/workflows/integration-tests.yml| 3 + .github/workflows/run-unit-tests.yml | 3 + Dockerfile | 5 +- Dockerfile.ci | 6 +- README.md | 26 +- RELEASE_NOTES.rst | 61 ++ airflow/config_templates/config.yml| 4 +- airflow/executors/executor_loader.py | 7 +- airflow/models/taskinstance.py | 6 +- airflow/operators/python.py| 4 +- airflow/providers/amazon/aws/utils/__init__.py | 5 +- airflow/providers/apache/kafka/hooks/base.py | 3 +- airflow/providers/apache/kafka/hooks/client.py | 16 + .../google/cloud/operators/dataproc_metastore.py | 23 +- .../google/cloud/sensors/cloud_composer.py | 173 - .../google/cloud/triggers/cloud_composer.py| 115 airflow/providers/google/provider.yaml | 5 +- airflow/providers/openai/provider.yaml | 2 +- airflow/providers/opensearch/hooks/opensearch.py | 21 +- .../providers/opensearch/operators/opensearch.py | 12 +- airflow/providers/sftp/hooks/sftp.py | 10 +- airflow/providers/sftp/sensors/sftp.py | 13 + airflow/providers_manager.py | 4 +- airflow/reproducible_build.yaml| 4
(airflow) branch listener-task-timeout updated (f0128ced4c -> 3d4661dac3)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git omit f0128ced4c local task job: add timeout, to not kill on_task_instance_success listener prematurely add f5c8059553 sftp_sensor: fixing resource management with sensor (#40022) add 86e613029b Implement CloudComposerDAGRunSensor (#40088) add b49db97b8e Fix param order in PythonOperator docstring (#40122) add 44a5300594 Chart: enable mysql keda support for triggerer (#37365) add 65dbf86f72 Fix openai 1.32 breaking openai tests (#40110) add 85be1867a6 Add warning to deprecated api docs that access control isn't applied (#40129) add 1a61eb3afd feat: OpenSearchQueryOperator using an endpoint with a self-signed certificate (#39788) add 27579cb773 Upgrade to codecov@v4 (#40128) add 22bd188439 Fix `importlib_metadata` import in aws utils (#40134) add 62444d8cff Resolve openlineage provider deprecations in tests (#40133) add 663d422588 Add Codecov token (#40136) add 3c8cd1cec2 Properly pass codecov-token to composite action (#40138) add e01877a162 Remove warnings when uploading code coverage (#40139) add 7a254ff240 Add rpc server to helm chart (#38549) add 15178b6953 Use stdlib `importlib.metadata` for retrieve `botocore` package version (#40137) add 0478e24957 Ignore exceptions when closing all sessions in tests (#40143) add 48b619078d Mark two tests as (trigger and deps) as flaky (#40144) add 340d6b0dde Fix highlight of example code in dags.rst (#40114) add cbe6c2dd24 Add `delete_topic` to `KafkaAdminClientHook` and teardown logic to Kafka integration tests (#40142) add 297ad80c7d Move Post Execution Log Grouping behind Exception Print (#40146) add 14a613fc7d Allow executors to be specified with only the class name of the Executor (#40131) add fc4fbb3dcf Update providers metadata 2024-06-09 (#40149) add f7708acab9 Fix hive_partition_sensor system test (#40023) add 072c63b539 Fix typo in providers_manager.py (#40157) add aabe66f869 Chart: Default airflow version to 2.9.2 (#40160) add e9d8222a19 Airflow 2.9.2 has been released (#40159) add 3d4661dac3 local task job: add timeout, to not kill on_task_instance_success listener prematurely This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (f0128ced4c) \ N -- N -- N refs/heads/listener-task-timeout (3d4661dac3) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .github/ISSUE_TEMPLATE/airflow_bug_report.yml | 2 +- .github/actions/post_tests_success/action.yml | 21 +- .github/workflows/integration-tests.yml| 3 + .github/workflows/run-unit-tests.yml | 3 + Dockerfile | 5 +- Dockerfile.ci | 6 +- README.md | 26 +- RELEASE_NOTES.rst | 61 ++ airflow/config_templates/config.yml| 4 +- airflow/executors/executor_loader.py | 7 +- airflow/models/taskinstance.py | 6 +- airflow/operators/python.py| 4 +- airflow/providers/amazon/aws/utils/__init__.py | 5 +- airflow/providers/apache/kafka/hooks/base.py | 3 +- airflow/providers/apache/kafka/hooks/client.py | 16 + .../google/cloud/operators/dataproc_metastore.py | 23 +- .../google/cloud/sensors/cloud_composer.py | 173 - .../google/cloud/triggers/cloud_composer.py| 115 airflow/providers/google/provider.yaml | 5 +- airflow/providers/openai/provider.yaml | 2 +- airflow/providers/opensearch/hooks/opensearch.py | 21 +- .../providers/opensearch/operators/opensearch.py | 12 +- airflow/providers/sftp/hooks/sftp.py | 10 +- airflow/providers/sftp/sensors/sftp.py | 13 + airflow/providers_manager.py | 4 +- airflow/reproducible_build.yaml| 4 +- chart/Chart.yaml | 20 +- chart/newsfragments/39433.significant.rst | 3 - chart/newsfragments/40160.significant.rst | 3 + chart/tem
(airflow) branch openlineage-process-execution updated (2c96acba20 -> 44ba855e1e)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 2c96acba20 openlineage: execute extraction and message sending in separate process add 44ba855e1e openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (2c96acba20) \ N -- N -- N refs/heads/openlineage-process-execution (44ba855e1e) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: tests/providers/openlineage/plugins/test_execution.py | 2 ++ 1 file changed, 2 insertions(+)
(airflow) branch openlineage-process-execution updated (a5ac1a4b48 -> 2c96acba20)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard a5ac1a4b48 openlineage: execute extraction and message sending in separate process discard ec1b19417c local task job: add timeout, to not kill on_task_instance_success listener prematurely add b03aa6caa0 Fix grammar in dags.rst (#40085) add fe9159635f google: move openlineage imports inside methods (#40062) add f2a5b1dee1 feat(providers/google): add default gcp_conn_id to GoogleBaseAsyncHook (#40080) add b358d0a6de Add `retry_args` parameter to `HttpOperator` (#40086) add 0776bdbad1 Resolve google cloud kubernetes operator and trigger deprecations in tests (#40072) add 1a613030e6 Add task SLA and queued datetime information to AirflowRunFacet (#40091) add a31b10edda fix typo in example_params_ui_tutorial (#40094) add 42a2b1a3bb Fix aws assume role session creation when deferrable (#40051) add 0568e9a604 Exit instead of return in case of Providers[fab] test is ignored (#40089) add 0e8d823a9f Remove apt-transport-https because It is a dummy transitional package(#40015) add 00a2843649 Include fatal reason for pod pending events (#39924) add 013523e5d3 Fix credentials intialization revealed by mypy version of google auth (#40108) add 5bbd70cb45 Import aiobotocore locally in tests rather than as global import (#40109) add 8daa53eaa5 Avoid resetting adopted task instances when retrying for kubernetes executor (#39406) add 80522b91bb catch sentry flush if exception happens in _execute_in_fork finally block (#40060) add 4723dbe99a Use a join for TI notes in TI batch API endpoint (#40028) add a3fc50d00b Remove obsolete conditional logic related to try_number (#40104) add 35bece7dc3 Prepare docs 1st wave June 2024 (#40057) add 0c51bd6ab6 Upgrade to latest installers (#40107) add f0128ced4c local task job: add timeout, to not kill on_task_instance_success listener prematurely add 2c96acba20 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (a5ac1a4b48) \ N -- N -- N refs/heads/openlineage-process-execution (2c96acba20) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: Dockerfile | 2 +- Dockerfile.ci | 6 +- .../endpoints/task_instance_endpoint.py| 4 +- airflow/example_dags/example_params_ui_tutorial.py | 6 +- airflow/models/taskinstance.py | 8 - airflow/providers/amazon/CHANGELOG.rst | 18 ++ airflow/providers/amazon/aws/hooks/base_aws.py | 11 +- airflow/providers/celery/CHANGELOG.rst | 14 ++ airflow/providers/celery/__init__.py | 2 +- .../celery/executors/celery_executor_utils.py | 8 +- airflow/providers/celery/provider.yaml | 3 +- airflow/providers/cncf/kubernetes/CHANGELOG.rst| 21 ++ airflow/providers/cncf/kubernetes/__init__.py | 2 +- .../kubernetes/executors/kubernetes_executor.py| 18 +- .../executors/kubernetes_executor_utils.py | 9 +- airflow/providers/cncf/kubernetes/provider.yaml| 3 +- .../hooks/vertex_ai/hyperparameter_tuning_job.py | 2 +- .../cloud/openlineage/{utils.py => mixins.py} | 187 -- .../providers/google/cloud/openlineage/utils.py| 219 + .../providers/google/cloud/operators/bigquery.py | 3 +- .../google/cloud/utils/credentials_provider.py | 19 +- .../providers/google/common/hooks/base_google.py | 6 +- .../google/common/utils/id_token_credentials.py| 2 +- airflow/providers/http/operators/http.py | 18 +- .../openlineage/facets/AirflowRunFacet.json| 7 + airflow/providers/openlineage/utils/utils.py | 5 +- docs/apache-airflow-providers-amazon/commits.rst | 20 +- docs/apache-airflow-providers-amazon/index.rst | 10 +- docs/apache-airflow-providers-celery/commits.rst | 16 +- docs/apache-airflow-providers-celery/index.rst | 6 +- .../commits.rst| 20 +- .../index.rst
(airflow) branch listener-task-timeout updated (ec1b19417c -> f0128ced4c)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git omit ec1b19417c local task job: add timeout, to not kill on_task_instance_success listener prematurely add b03aa6caa0 Fix grammar in dags.rst (#40085) add fe9159635f google: move openlineage imports inside methods (#40062) add f2a5b1dee1 feat(providers/google): add default gcp_conn_id to GoogleBaseAsyncHook (#40080) add b358d0a6de Add `retry_args` parameter to `HttpOperator` (#40086) add 0776bdbad1 Resolve google cloud kubernetes operator and trigger deprecations in tests (#40072) add 1a613030e6 Add task SLA and queued datetime information to AirflowRunFacet (#40091) add a31b10edda fix typo in example_params_ui_tutorial (#40094) add 42a2b1a3bb Fix aws assume role session creation when deferrable (#40051) add 0568e9a604 Exit instead of return in case of Providers[fab] test is ignored (#40089) add 0e8d823a9f Remove apt-transport-https because It is a dummy transitional package(#40015) add 00a2843649 Include fatal reason for pod pending events (#39924) add 013523e5d3 Fix credentials intialization revealed by mypy version of google auth (#40108) add 5bbd70cb45 Import aiobotocore locally in tests rather than as global import (#40109) add 8daa53eaa5 Avoid resetting adopted task instances when retrying for kubernetes executor (#39406) add 80522b91bb catch sentry flush if exception happens in _execute_in_fork finally block (#40060) add 4723dbe99a Use a join for TI notes in TI batch API endpoint (#40028) add a3fc50d00b Remove obsolete conditional logic related to try_number (#40104) add 35bece7dc3 Prepare docs 1st wave June 2024 (#40057) add 0c51bd6ab6 Upgrade to latest installers (#40107) add f0128ced4c local task job: add timeout, to not kill on_task_instance_success listener prematurely This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (ec1b19417c) \ N -- N -- N refs/heads/listener-task-timeout (f0128ced4c) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: Dockerfile | 2 +- Dockerfile.ci | 6 +- .../endpoints/task_instance_endpoint.py| 4 +- airflow/example_dags/example_params_ui_tutorial.py | 6 +- airflow/models/taskinstance.py | 8 - airflow/providers/amazon/CHANGELOG.rst | 18 ++ airflow/providers/amazon/aws/hooks/base_aws.py | 11 +- airflow/providers/celery/CHANGELOG.rst | 14 ++ airflow/providers/celery/__init__.py | 2 +- .../celery/executors/celery_executor_utils.py | 8 +- airflow/providers/celery/provider.yaml | 3 +- airflow/providers/cncf/kubernetes/CHANGELOG.rst| 21 ++ airflow/providers/cncf/kubernetes/__init__.py | 2 +- .../kubernetes/executors/kubernetes_executor.py| 18 +- .../executors/kubernetes_executor_utils.py | 9 +- airflow/providers/cncf/kubernetes/provider.yaml| 3 +- .../hooks/vertex_ai/hyperparameter_tuning_job.py | 2 +- .../cloud/openlineage/{utils.py => mixins.py} | 183 - .../providers/google/cloud/openlineage/utils.py| 219 + .../providers/google/cloud/operators/bigquery.py | 3 +- .../google/cloud/utils/credentials_provider.py | 19 +- .../providers/google/common/hooks/base_google.py | 6 +- .../google/common/utils/id_token_credentials.py| 2 +- airflow/providers/http/operators/http.py | 18 +- .../openlineage/facets/AirflowRunFacet.json| 7 + airflow/providers/openlineage/utils/utils.py | 5 +- docs/apache-airflow-providers-amazon/commits.rst | 20 +- docs/apache-airflow-providers-amazon/index.rst | 10 +- docs/apache-airflow-providers-celery/commits.rst | 16 +- docs/apache-airflow-providers-celery/index.rst | 6 +- .../commits.rst| 20 +- .../index.rst | 4 +- docs/apache-airflow/core-concepts/dags.rst | 6 +- docs/apache-airflow/installation/dependencies.rst | 4 +- scripts/docker/entrypoint_ci.sh| 2 +- tests/
(airflow) branch openlineage-process-execution updated (3b90e01262 -> a5ac1a4b48)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git omit 3b90e01262 openlineage: execute extraction and message sending in separate process add a5ac1a4b48 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (3b90e01262) \ N -- N -- N refs/heads/openlineage-process-execution (a5ac1a4b48) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: tests/providers/openlineage/plugins/test_execution.py | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-)
(airflow) branch openlineage-process-execution updated (23849615a9 -> 3b90e01262)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git omit 23849615a9 openlineage: execute extraction and message sending in separate process add 3b90e01262 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (23849615a9) \ N -- N -- N refs/heads/openlineage-process-execution (3b90e01262) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/conf.py | 1 + airflow/providers/openlineage/plugins/listener.py | 12 ++- .../providers/openlineage/plugins/test_adapter.py | 42 .../providers/openlineage/plugins/test_listener.py | 115 ++--- .../openlineage/plugins/test_openlineage.py| 9 -- tests/providers/openlineage/test_conf.py | 25 - 6 files changed, 67 insertions(+), 137 deletions(-)
(airflow) branch openlineage-process-execution updated (449dd3c671 -> 2a1c57a346)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 449dd3c671 openlineage: execute extraction and message sending in separate process new 2a1c57a346 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (449dd3c671) \ N -- N -- N refs/heads/openlineage-process-execution (2a1c57a346) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: airflow/providers/common/sql/operators/sql.py | 6 - .../providers/google/cloud/openlineage/utils.py| 4 + airflow/providers/openlineage/conf.py | 16 +- airflow/providers/openlineage/plugins/listener.py | 16 +- airflow/providers/openlineage/provider.yaml| 11 +- airflow/providers/openlineage/sqlparser.py | 20 +-- airflow/providers/openlineage/utils/sql.py | 12 +- airflow/providers/openlineage/utils/utils.py | 3 +- generated/provider_dependencies.json | 4 +- tests/dags/test_openlineage_execution.py | 60 +++ .../openlineage/plugins/test_execution.py | 187 + 11 files changed, 298 insertions(+), 41 deletions(-) create mode 100644 tests/dags/test_openlineage_execution.py create mode 100644 tests/providers/openlineage/plugins/test_execution.py
(airflow) branch openlineage-process-execution updated (2a1c57a346 -> 23849615a9)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git discard 2a1c57a346 openlineage: execute extraction and message sending in separate process discard e2acd51121 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 914bccc387 Improve trigger UI for string array format validation (#39993) add 1ec3b39f28 Resolve www views deprecations in tests (#40009) add 32cf8cf37d Update index.rst (#40040) add ea682382bc Adding Glue Data Quality Rule Recommendation Run (#40014) add 21dae6ee39 Resolve pagerduty deprecations in tests (#39945) add f6148e6e33 Update dataplex.py (#40041) add 5d227860ad Fix `DbtCloudRunJobOperator` to Use Correct Status Parameters for `reuse_existing_run` (#40048) add 99dd24b436 Doc-only: mention minimum boto3 1.34.52 for AWS provider when using Batch `ecs_properties_override` (#39983) add 83c118413c Add `retry_from_failure` parameter to DbtCloudRunJobOperator (#38868) add 06a200d5fa Bump boto min versions (#40052) add 0f6e31f4b9 Fix reattach_on_restart parameter for the sync mode (#39329) add 73d02e0f21 Add PID and return code to _execute_in_fork logging (#40058) add c6f85f08f8 Set PR #39638 for Airflow 2.10 (#40044) add c202c07f67 Introduce AirflowJobFacet and AirflowStateRunFacet (#39520) add 07982b0e83 Add @utkarsharma2 in commiter list. (#40065) add 9bcf665570 Fix typos in overview.rst (#40061) add ac2a1ef81d Expllicitly skip API tests for compatiblity tests on 2.7 and 2.8 (#40059) add ef1c9a2a2b Chart: Allow AWS Executors (#38524) add e7f11b58b6 Resolve databricks deprecations in tests (#40068) add 4dea367047 Add unit test to cover back compat case in celery (#40035) add fcb9f87c55 fix typo and path to licences folder (#40067) add 16e0182025 Docs: Simpler command to check local scheduler is alive (#40074) add f28ee5a694 Resolve core operators deprecations in tests (#39992) add 45bf7b9721 Resolve dagbag deprecations in tests (#39989) add ec1b19417c local task job: add timeout, to not kill on_task_instance_success listener prematurely new 23849615a9 openlineage: execute extraction and message sending in separate process This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (2a1c57a346) \ N -- N -- N refs/heads/openlineage-process-execution (23849615a9) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: Dockerfile.ci | 6 + LICENSE| 8 +- airflow/_vendor/README.md | 7 +- ...exes_on_dag_id_column_in_referencing_tables.py} | 2 +- airflow/providers/amazon/aws/hooks/glue.py | 23 ++ airflow/providers/amazon/aws/operators/bedrock.py | 26 +- airflow/providers/amazon/aws/operators/glue.py | 146 +++ airflow/providers/amazon/aws/sensors/glue.py | 125 +- airflow/providers/amazon/aws/triggers/glue.py | 35 ++ airflow/providers/amazon/aws/waiters/glue.json | 49 +++ airflow/providers/amazon/provider.yaml | 16 +- .../celery/executors/celery_executor_utils.py | 2 +- airflow/providers/cncf/kubernetes/operators/pod.py | 57 ++- airflow/providers/dbt/cloud/hooks/dbt.py | 33 ++ airflow/providers/dbt/cloud/operators/dbt.py | 9 +- .../providers/google/cloud/operators/dataplex.py | 2 +- .../openlineage/facets/AirflowJobFacet.json| 40 ++ .../openlineage/facets/AirflowRunFacet.json| 254 .../facets/AirflowStateRunFacet.json} | 25 +- .../openlineage/facets}/__init__.py| 0 airflow/providers/openlineage/plugins/adapter.py | 20 +- airflow/providers/openlineage/plugins/facets.py| 47 ++- airflow/providers/openlineage/plugins/listener.py | 4 + airflow/providers/openlineage/utils/utils.py | 186 - airflow/utils/db.py| 2 +- airflow/www/templates/airf
(airflow) 01/01: openlineage: execute extraction and message sending in separate process
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git commit 23849615a93da2dafd9a9fa76af456fe9b69de2a Author: Maciej Obuchowski AuthorDate: Fri May 31 14:47:28 2024 +0200 openlineage: execute extraction and message sending in separate process Signed-off-by: Maciej Obuchowski --- .../providers/google/cloud/openlineage/utils.py| 4 + airflow/providers/openlineage/conf.py | 16 +- airflow/providers/openlineage/plugins/listener.py | 40 - airflow/providers/openlineage/provider.yaml| 11 +- airflow/providers/openlineage/sqlparser.py | 16 +- airflow/providers/openlineage/utils/sql.py | 6 + airflow/providers/snowflake/hooks/snowflake.py | 6 +- generated/provider_dependencies.json | 4 +- tests/dags/test_openlineage_execution.py | 60 +++ .../openlineage/plugins/test_execution.py | 187 + 10 files changed, 335 insertions(+), 15 deletions(-) diff --git a/airflow/providers/google/cloud/openlineage/utils.py b/airflow/providers/google/cloud/openlineage/utils.py index fb0d4c663b..986bf62f62 100644 --- a/airflow/providers/google/cloud/openlineage/utils.py +++ b/airflow/providers/google/cloud/openlineage/utils.py @@ -163,9 +163,13 @@ def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None: if not result: return None """ +# chain.pop modifies passed list, this can be unexpected +chain = chain.copy() chain.reverse() try: while chain: +while isinstance(source, list) and len(source) == 1: +source = source[0] next_key = chain.pop() if isinstance(source, dict): source = source.get(next_key) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index a9601a416b..887a931f5e 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -33,7 +33,14 @@ from __future__ import annotations import os from typing import Any -from airflow.compat.functools import cache +if os.getenv("PYTEST_VERSION"): + +def decorator(func): +return func + +cache = decorator +else: +from airflow.compat.functools import cache from airflow.configuration import conf _CONFIG_SECTION = "openlineage" @@ -130,3 +137,10 @@ def dag_state_change_process_pool_size() -> int: """[openlineage] dag_state_change_process_pool_size.""" option = conf.get(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback="") return _safe_int_convert(str(option).strip(), default=1) + + +@cache +def execution_timeout() -> int: +"""[openlineage] execution_timeout.""" +option = conf.get(_CONFIG_SECTION, "execution_timeout", fallback="") +return _safe_int_convert(str(option).strip(), default=10) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 728159a795..f103f36526 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,12 +17,15 @@ from __future__ import annotations import logging +import os from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING +import psutil from openlineage.client.serde import Serde from packaging.version import Version +from setproctitle import getproctitle, setproctitle from airflow import __version__ as AIRFLOW_VERSION, settings from airflow.listeners import hookimpl @@ -38,6 +41,7 @@ from airflow.providers.openlineage.utils.utils import ( is_selective_lineage_enabled, print_warning, ) +from airflow.settings import configure_orm from airflow.stats import Stats from airflow.utils.timeout import timeout @@ -156,7 +160,7 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_running() +self._fork_execute(on_running, "on_running") @hookimpl def on_task_instance_success( @@ -223,7 +227,7 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_success() +self._fork_execute(on_success, "on_success") if _IS_AIRFLOW_2_10_OR_HIGHER: @@ -318,10 +322,40 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_failure() +self._fork_execute(on_failure, "on_failure") + +def _fork_execute(self, callable, callable_name: str): +
(airflow) branch listener-task-timeout updated (e2acd51121 -> ec1b19417c)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git omit e2acd51121 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 914bccc387 Improve trigger UI for string array format validation (#39993) add 1ec3b39f28 Resolve www views deprecations in tests (#40009) add 32cf8cf37d Update index.rst (#40040) add ea682382bc Adding Glue Data Quality Rule Recommendation Run (#40014) add 21dae6ee39 Resolve pagerduty deprecations in tests (#39945) add f6148e6e33 Update dataplex.py (#40041) add 5d227860ad Fix `DbtCloudRunJobOperator` to Use Correct Status Parameters for `reuse_existing_run` (#40048) add 99dd24b436 Doc-only: mention minimum boto3 1.34.52 for AWS provider when using Batch `ecs_properties_override` (#39983) add 83c118413c Add `retry_from_failure` parameter to DbtCloudRunJobOperator (#38868) add 06a200d5fa Bump boto min versions (#40052) add 0f6e31f4b9 Fix reattach_on_restart parameter for the sync mode (#39329) add 73d02e0f21 Add PID and return code to _execute_in_fork logging (#40058) add c6f85f08f8 Set PR #39638 for Airflow 2.10 (#40044) add c202c07f67 Introduce AirflowJobFacet and AirflowStateRunFacet (#39520) add 07982b0e83 Add @utkarsharma2 in commiter list. (#40065) add 9bcf665570 Fix typos in overview.rst (#40061) add ac2a1ef81d Expllicitly skip API tests for compatiblity tests on 2.7 and 2.8 (#40059) add ef1c9a2a2b Chart: Allow AWS Executors (#38524) add e7f11b58b6 Resolve databricks deprecations in tests (#40068) add 4dea367047 Add unit test to cover back compat case in celery (#40035) add fcb9f87c55 fix typo and path to licences folder (#40067) add 16e0182025 Docs: Simpler command to check local scheduler is alive (#40074) add f28ee5a694 Resolve core operators deprecations in tests (#39992) add 45bf7b9721 Resolve dagbag deprecations in tests (#39989) add ec1b19417c local task job: add timeout, to not kill on_task_instance_success listener prematurely This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (e2acd51121) \ N -- N -- N refs/heads/listener-task-timeout (ec1b19417c) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: Dockerfile.ci | 6 + LICENSE| 8 +- airflow/_vendor/README.md | 7 +- ...exes_on_dag_id_column_in_referencing_tables.py} | 2 +- airflow/providers/amazon/aws/hooks/glue.py | 23 ++ airflow/providers/amazon/aws/operators/bedrock.py | 26 +- airflow/providers/amazon/aws/operators/glue.py | 146 +++ airflow/providers/amazon/aws/sensors/glue.py | 125 +- airflow/providers/amazon/aws/triggers/glue.py | 35 ++ airflow/providers/amazon/aws/waiters/glue.json | 49 +++ airflow/providers/amazon/provider.yaml | 16 +- .../celery/executors/celery_executor_utils.py | 2 +- airflow/providers/cncf/kubernetes/operators/pod.py | 57 ++- airflow/providers/dbt/cloud/hooks/dbt.py | 33 ++ airflow/providers/dbt/cloud/operators/dbt.py | 9 +- .../providers/google/cloud/operators/dataplex.py | 2 +- .../openlineage/facets/AirflowJobFacet.json| 40 ++ .../openlineage/facets/AirflowRunFacet.json| 254 .../facets/AirflowStateRunFacet.json} | 25 +- .../openlineage/facets}/__init__.py| 0 airflow/providers/openlineage/plugins/adapter.py | 20 +- airflow/providers/openlineage/plugins/facets.py| 47 ++- airflow/providers/openlineage/plugins/listener.py | 4 + airflow/providers/openlineage/utils/utils.py | 186 - airflow/utils/db.py| 2 +- airflow/www/templates/airflow/trigger.html | 2 +- chart/README.md| 3 + chart/values.schema.json | 4 +- dev/breeze/doc/images/output_build-docs.svg| 28 +- dev/breeze/doc/images/output_build-docs.txt| 2 +- ...elease-management_prepare-provider-packages.svg | 26 +- ...elease-management_prepare-provider-packages.txt | 2 +-
(airflow) 01/01: openlineage: execute extraction and message sending in separate process
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git commit 2a1c57a34621a848e75d5f3642e9c60936fcee9b Author: Maciej Obuchowski AuthorDate: Fri May 31 14:47:28 2024 +0200 openlineage: execute extraction and message sending in separate process Signed-off-by: Maciej Obuchowski --- .../providers/google/cloud/openlineage/utils.py| 4 + airflow/providers/openlineage/conf.py | 16 +- airflow/providers/openlineage/plugins/listener.py | 40 - airflow/providers/openlineage/provider.yaml| 11 +- airflow/providers/openlineage/sqlparser.py | 16 +- airflow/providers/openlineage/utils/sql.py | 6 + airflow/providers/snowflake/hooks/snowflake.py | 6 +- generated/provider_dependencies.json | 4 +- tests/dags/test_openlineage_execution.py | 60 +++ .../openlineage/plugins/test_execution.py | 187 + 10 files changed, 335 insertions(+), 15 deletions(-) diff --git a/airflow/providers/google/cloud/openlineage/utils.py b/airflow/providers/google/cloud/openlineage/utils.py index fb0d4c663b..986bf62f62 100644 --- a/airflow/providers/google/cloud/openlineage/utils.py +++ b/airflow/providers/google/cloud/openlineage/utils.py @@ -163,9 +163,13 @@ def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None: if not result: return None """ +# chain.pop modifies passed list, this can be unexpected +chain = chain.copy() chain.reverse() try: while chain: +while isinstance(source, list) and len(source) == 1: +source = source[0] next_key = chain.pop() if isinstance(source, dict): source = source.get(next_key) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index a9601a416b..887a931f5e 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -33,7 +33,14 @@ from __future__ import annotations import os from typing import Any -from airflow.compat.functools import cache +if os.getenv("PYTEST_VERSION"): + +def decorator(func): +return func + +cache = decorator +else: +from airflow.compat.functools import cache from airflow.configuration import conf _CONFIG_SECTION = "openlineage" @@ -130,3 +137,10 @@ def dag_state_change_process_pool_size() -> int: """[openlineage] dag_state_change_process_pool_size.""" option = conf.get(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback="") return _safe_int_convert(str(option).strip(), default=1) + + +@cache +def execution_timeout() -> int: +"""[openlineage] execution_timeout.""" +option = conf.get(_CONFIG_SECTION, "execution_timeout", fallback="") +return _safe_int_convert(str(option).strip(), default=10) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index e07c5507d8..4a2bff5743 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,12 +17,15 @@ from __future__ import annotations import logging +import os from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING +import psutil from openlineage.client.serde import Serde from packaging.version import Version +from setproctitle import getproctitle, setproctitle from airflow import __version__ as AIRFLOW_VERSION, settings from airflow.listeners import hookimpl @@ -37,6 +40,7 @@ from airflow.providers.openlineage.utils.utils import ( is_selective_lineage_enabled, print_warning, ) +from airflow.settings import configure_orm from airflow.stats import Stats from airflow.utils.timeout import timeout @@ -155,7 +159,7 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_running() +self._fork_execute(on_running, "on_running") @hookimpl def on_task_instance_success( @@ -222,7 +226,7 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_success() +self._fork_execute(on_success, "on_success") if _IS_AIRFLOW_2_10_OR_HIGHER: @@ -317,10 +321,40 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_failure() +self._fork_execute(on_failure, "on_failure") + +def _fork_execute(self, callable, callable_name: str): +
(airflow) branch main updated: Introduce AirflowJobFacet and AirflowStateRunFacet (#39520)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c202c07f67 Introduce AirflowJobFacet and AirflowStateRunFacet (#39520) c202c07f67 is described below commit c202c07f67173718c736d95de22185b65b25b580 Author: Kacper Muda AuthorDate: Wed Jun 5 13:52:13 2024 +0200 Introduce AirflowJobFacet and AirflowStateRunFacet (#39520) Signed-off-by: Kacper Muda --- .../openlineage/facets/AirflowJobFacet.json| 40 ++ .../openlineage/facets/AirflowRunFacet.json| 254 .../openlineage/facets/AirflowStateRunFacet.json | 34 ++ airflow/providers/openlineage/facets/__init__.py | 16 + airflow/providers/openlineage/plugins/adapter.py | 20 +- airflow/providers/openlineage/plugins/facets.py| 47 ++- airflow/providers/openlineage/plugins/listener.py | 4 + airflow/providers/openlineage/utils/utils.py | 186 - docs/spelling_wordlist.txt | 4 + .../providers/openlineage/plugins/test_adapter.py | 129 -- tests/providers/openlineage/utils/test_utils.py| 449 - 11 files changed, 1115 insertions(+), 68 deletions(-) diff --git a/airflow/providers/openlineage/facets/AirflowJobFacet.json b/airflow/providers/openlineage/facets/AirflowJobFacet.json new file mode 100644 index 00..51a9954de3 --- /dev/null +++ b/airflow/providers/openlineage/facets/AirflowJobFacet.json @@ -0,0 +1,40 @@ +{ +"$schema": "https://json-schema.org/draft/2020-12/schema;, +"$defs": { + "AirflowJobFacet": { +"allOf": [ + { +"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet; + }, + { +"type": "object", +"properties": { + "taskTree": { +"description": "The hierarchical structure of tasks in the DAG.", +"type": "object", +"additionalProperties": true + }, + "taskGroups": { +"description": "Information about all task groups within the DAG.", +"type": "object", +"additionalProperties": true + }, + "tasks": { +"description": "Details of all individual tasks within the DAG.", +"type": "object", +"additionalProperties": true + } +}, +"required": ["taskTree", "taskGroups", "tasks"] + } +], +"type": "object" + } +}, +"type": "object", +"properties": { + "airflow": { +"$ref": "#/$defs/AirflowJobFacet" + } +} + } diff --git a/airflow/providers/openlineage/facets/AirflowRunFacet.json b/airflow/providers/openlineage/facets/AirflowRunFacet.json new file mode 100644 index 00..504fb1bc3a --- /dev/null +++ b/airflow/providers/openlineage/facets/AirflowRunFacet.json @@ -0,0 +1,254 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema;, + "$defs": { +"AirflowRunFacet": { + "allOf": [ +{ + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet; +}, +{ + "type": "object", + "properties": { +"dag": { + "$ref": "#/$defs/DAG" +}, +"dagRun": { + "$ref": "#/$defs/DagRun" +}, +"taskInstance": { + "$ref": "#/$defs/TaskInstance" +}, +"task": { + "$ref": "#/$defs/Task" +}, +"taskUuid": { + "type": "string" +} + }, + "required": [ +"dag", +"dagRun", +"taskInstance", +"task", +"taskUuid" + ] +} + ] +}, +"Task": { + "type": "object", + "properties": { +"depends_on_past": { + "type": "boolean" +}, +"downstream_task_ids": { + "
(airflow) branch openlineage-process-execution created (now 449dd3c671)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git at 449dd3c671 openlineage: execute extraction and message sending in separate process This branch includes the following new commits: new 449dd3c671 openlineage: execute extraction and message sending in separate process The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) 01/01: openlineage: execute extraction and message sending in separate process
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-process-execution in repository https://gitbox.apache.org/repos/asf/airflow.git commit 449dd3c6719781527c2e0a551a512bad7aa5a05d Author: Maciej Obuchowski AuthorDate: Fri May 31 14:47:28 2024 +0200 openlineage: execute extraction and message sending in separate process Signed-off-by: Maciej Obuchowski --- airflow/providers/common/sql/operators/sql.py | 6 airflow/providers/openlineage/plugins/listener.py | 42 --- airflow/providers/openlineage/sqlparser.py| 24 ++--- airflow/providers/openlineage/utils/sql.py| 8 + airflow/providers/openlineage/utils/utils.py | 3 +- airflow/providers/snowflake/hooks/snowflake.py| 6 ++-- 6 files changed, 76 insertions(+), 13 deletions(-) diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index d50a6bf0f5..d8602eec93 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -309,6 +309,8 @@ class SQLExecuteQueryOperator(BaseSQLOperator): except ImportError: return None +self.log.debug("Getting Hook for OL") + hook = self.get_db_hook() try: @@ -319,6 +321,8 @@ class SQLExecuteQueryOperator(BaseSQLOperator): # OpenLineage provider release < 1.8.0 - we always use connection use_external_connection = True +self.log.error("External connection? %s", use_external_connection) + connection = hook.get_connection(getattr(hook, hook.conn_name_attr)) try: database_info = hook.get_openlineage_database_info(connection) @@ -338,6 +342,8 @@ class SQLExecuteQueryOperator(BaseSQLOperator): self.log.debug("%s failed to get database dialect", hook) return None +self.log.error("SQL result? %s", str(sql_parser)) + operator_lineage = sql_parser.generate_openlineage_metadata_from_sql( sql=self.sql, hook=hook, diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index e07c5507d8..57ba05fdbe 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,10 +17,12 @@ from __future__ import annotations import logging +import os from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING +import psutil from openlineage.client.serde import Serde from packaging.version import Version @@ -37,6 +39,7 @@ from airflow.providers.openlineage.utils.utils import ( is_selective_lineage_enabled, print_warning, ) +from airflow.settings import configure_orm from airflow.stats import Stats from airflow.utils.timeout import timeout @@ -82,7 +85,7 @@ class OpenLineageListener: ) return -self.log.debug("OpenLineage listener got notification about task instance start") +self.log.debug("OpenLineage listener got notification about task instance start - fork version") dagrun = task_instance.dag_run task = task_instance.task if TYPE_CHECKING: @@ -155,7 +158,7 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_running() +self._fork_execute(on_running, "on_running") @hookimpl def on_task_instance_success( @@ -222,7 +225,7 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_success() +self._fork_execute(on_success, "on_success") if _IS_AIRFLOW_2_10_OR_HIGHER: @@ -317,10 +320,41 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_failure() +self._fork_execute(on_failure, "on_failure") + +def _fork_execute(self, callable, callable_name: str): +self.log.debug("Will fork to execute OpenLineage process.") +if isinstance(callable_name, tuple): +self.log.error("WHY ITS TUPLE?") +pid = os.fork() +if pid: +process = psutil.Process(pid) +try: +self.log.debug("Waiting for process %s", pid) +process.wait(10) +except psutil.TimeoutExpired: +self.log.warning( +"OpenLineage process %s expired. This should not affect process execution.", pid +) +process.kill() +except BaseException: +# Kill the process. +pass +
(airflow) branch listener-task-timeout updated (430c4fa120 -> e2acd51121)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git discard 430c4fa120 local task job: add timeout, to not kill on_task_instance_success listener prematurely add e2acd51121 local task job: add timeout, to not kill on_task_instance_success listener prematurely This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (430c4fa120) \ N -- N -- N refs/heads/listener-task-timeout (e2acd51121) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: airflow/config_templates/config.yml | 5 +++-- airflow/jobs/local_task_job_runner.py | 2 +- tests/jobs/test_local_task_job.py | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-)
(airflow) branch listener-task-timeout updated (32e46dde86 -> 430c4fa120)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git discard 32e46dde86 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 11f219abff Prepare docs 4th wave May 2024 (#39934) add 032d27640b Refresh properties on KubernetesPodOperator on token expiration also when logging (#39789) add 4127ce4c2d Add `drill` integration tests (#39921) add b7b6d1426f Resolve sshhook deprecations in tests (#39907) add ddcc1b3a00 Fix: remove process_func from templated_fields (#39948) add eb7826f63f Home: filter running/failed and active/paused dags (#39701) add 93ba263732 AIP-44 Optimize Pydantic Class Serialization Mapping (#39943) add b8a83b2293 Workaround new yandexcloud breaking dataproc integration (#39964) add 5aa43e2a03 Update Databricks feature checklist comparison (#39965) add 683dfb6f08 Resolve baseoperatormeta deprecations in tests (#39963) add fa0aaf694e Resolving EMR notebook deprecated warnings (#39829) add fcd1a26a9a Allow user-specified object attributes to be used in check_fn for S3KeySensor (#39950) add 78523fdbf1 Adding Amazon Glue Data Quality Service (#39923) add 8173693a70 Remove upper-binding in yandex after dataproc issue is fixed (#39974) add c8c97b5067 Fix Mark Instance state buttons stay disabled if user lacks permission (#37451). (#38732) add 4d5591a509 Provide extra tip on labeling DynamicTaskMapping (#39977) add 936c3892af Resolve triggerer job logging deprecations in tests (#39962) add 5137aef517 Resolve timestamp deprecations in tests (#39978) add 93e6f0070a Resolve skipmixin deprecations in tests (#39971) add b167e3dbdd Resolve weaviate deprecations in tests (#39928) add d93f965d84 Resolve backfill job deprecations in tests (#39961) add 53e6739e67 Limit yandex provider to avoid mypy errors (#39990) add 68fe455b6f Resolve taskinstance deprecations in tests (#39988) add 3af562ff53 resolving conflicts (#39734) add 44800b2fde Remove unused parameter from glboal_constants.py (#39984) add ce5cd52635 Change regular expression to exclude double quote and newline (#39991) add 21e6a49db5 resolving conflicts (#39728) add 30c82ee066 Resolve tableau deprecations in tests (#39925) add 5bd07aadd6 Resolve hashicorp deprecations in tests (#39949) add d872f7ed93 Update integration tests docs and add "Writing Integration Tests" guide (#39986) add 2bdaf44841 fix sub-heading in 07_local_virtualenv.rst (#39995) add 933d80c1aa Update dataproc.rst (#39395) add e3e450ede2 Javascript connection form will apply CodeMirror to all textarea's dynamically (#39812) add 357988831a Deduplicate model name in SageMakerTransformOperator (#39956) add bf7d5387e6 Remove rawwar from triager (#40002) add 2ee4568010 Resolve stats deprecations in tests (#39957) add ae648e6884 Pass triggered or existing DAG Run logical date to DagStateTrigger (#39960) add 7ceb3d6778 Remove deprecated docker compose version field (#4) add eaeb170b24 Resolve weekday operator deprecations in tests (#40004) add 7cae13084c Fix Todo remove hybrid property hack (#39765) add 8a9699161f Remove colon from notes header to make headlines in UI consistent (#40001) add c2a57a79c5 Uuups, correct bracket in Bosch name list (#39998) add 385d79bfd7 Added ordering key option for PubSubPublishMessageOperator GCP Operator (#39955) add 57b12fe0ce Resolve dag serialization deprecations in tests (#40006) add 3d97474a49 Add Shahar to triage team (#40007) add 38e003c2c9 Warn on mini scheduler failures instead of debug (#39760) add 1436ca472f Update providers metadata 2024-05-30 (#40021) add bb06c56403 Allow Task Group Ids to be passed as branches in BranchMixIn (#38883) add 9b67a5ea33 Resolve trigger dagrun operator deprecations in tests (#40003) add 1bf847914e Enable templating in extraContainers and extraInitContainers (#38507) add da3a77a3d5 Fix triggerer race condition in HA setting (#38666) add 981ba8f005 Fixes KubernetesPodTrigger failing running pods with timeout (#40019) add 354863690f Add listeners for Dag import errors (#39739) add 651a6d6a68 standardizes template fields for `BaseSQLOperator` and adds `database` as a templated field (#39826) add b5bb039811 Fix bug that makes `AirflowSecurityManagerV2` leave transactions in the `idle in transaction` state (#39935) add 19c145c9ef Resolve aws emr deprecations in tests (#40020) add 4849fefb50 Resolve aws provider deprecations in tests (#40026) add dbb28d801c Resolve aws ecs deprecations in tests (#40016) add 8a32b940ce Bedrock system test adjustment (#40032) add 8b99ad0fbe Document `extraContainers` and `extraInit
(airflow) branch listener-task-timeout updated (460fadf1b9 -> 32e46dde86)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git discard 460fadf1b9 local task job: add timeout, to not kill on_task_instance_success listener prematurely add 9dd77520be Introduce Amazon Comprehend Service (#39592) add 6afc75fcce Add `breeze generate-migration-file` command (#39632) add a7960a1e16 Prevent generation of empty revision files by alembic autogenerate (#39634) add 610747d25a Add timeout when watching pod events in k8s executor (#39551) add e0dd075d1b AIP-21: yandexcloud: rename files, emit deprecation warning (#39618) add b0e3915316 Remove mysql/postgres from extras before determining the installation_command_flags (#39610) add 287c1887e9 feat: K8S 1.30 support & kind 0.23.0 (#39631) add 9284dc5391 Amazon Bedrock - Retrieve and RetrieveAndGenerate (#39500) add 81a82d8481 Run unit tests for Providers with airflow installed as package. (#39513) add 07c40bd78a Support failing tasks stuck in queued for hybrid executors (#39624) add 9ea78d9d72 Fix default value for aws batch operator retry strategy (#39608) add cba653ee23 Update contributing doc with breeze generate-migration-file command (#39655) add 435ba144d1 Doc fix: Remove misplaced backquotes in ``faq.rst`` (#39661) add 8987b09fd8 Use `compose-spec.json` which do not complaining about interpolations (#39662) add 9aae5311dd Revert "Use `compose-spec.json` which do not complaining about interpolations…" (#39669) add ea8ed7c43f Improvising high availability field name in hive hook (#39658) add f3687b68a6 Sagemaker trigger: pass the job name as part of the event (#39671) add 4dce7459d9 Fix #35946: Visible DAG RUN doesn't point to the same dag run id (#38365) add 448707c1d8 Add task failed dependencies to details page. (#38449) add cd0c6a7e77 Add Scarf based telemetry (#39510) add 4d0c7242bc Small refactor for example_bedrock_knowledge_base.py (#39672) add d4a5f4e3a7 Rename `telemetry-collection` to `usage-data-collection` (#39673) add a07d799482 Change dataset URI validation to raise warning instead of error in Airflow 2.9 (#39670) add e8183a9e8d Add indexes on dag_id column in referencing tables to speed up deletion of dag records (#39638) add 74c3fb366e Remove parent_model version suffix if it is passed to Vertex AI operators (#39640) add 8d1bd345b2 fix: empty openlineage dataset name for AthenaExtractor (#39677) add 4de79a0f6b feat: Add custom provider runtime checks (#39609) add 0b698a852b Add missing `dag_state_change_process_pool_size` in `provider.yaml`. (#39674) add 8b19b78ba5 Update plugins.rst examples to use pyproject.toml over setup.py (#39665) add 9ff245591e Fix the argument type of input_vectors in pinecone upsert (#39688) add 77a6b4f419 Add args to docker service ContainerSpec (#39464) add a3ae17f62f Update providers metadata 2024-05-17 (#39691) add db80e5e21c DbAPiHook: Don't log a warning message if placeholder is None and make sure warning message is formatted correctly (#39690) add 70f868e867 Print stderr from helm test failures (#39698) add a31169bd63 Pin google-cloud-bigquery to < 3.21.0 (#39583) add 27b3a22e34 Introduce anonymous credentials in GCP base hook (#39695) add 655bb601fe docs: fix typo in iceberg dags (#39705) add 79cf7e0609 weaviate remove deprecations (#39707) add cc9308c710 Reraise exception from strict dataset URI checks (#39719) add 4ee46b984d Remove `openlineage.common` dependencies in Google and Snowflake providers. (#39614) add 4d525aa32d Determine needs_expansion at time of serialization (#39604) add f509b0a924 More typing in TimeSensor and TimeSensorAsync (#39696) add a81504e316 chore: Update conf retrieval docstring and adjust pool_size (#39721) add b7671ef5ab Re-configure ORM in spawned OpenLineage process in scheduler. (#39735) add b41f429612 Remove deprecations eks (#39709) add 1da7f1f433 Pin requests due to incompatibility with docker-py (#39740) add ec2e245f0e Fetch served logs also when task attempt is up for retry and no remote logs available (#39496) add f18e6340d8 Better typing for BaseOperator `defer` (#39742) add 9532cc7a6c fix: Prevent error when extractor can't be imported (#39736) add 49b38719e2 Fix automatic termination issue in `EmrOperator` by ensuring `waiter_max_attempts` is set for deferrable triggers (#38658) add 0f717ea5e6 removed stale code (#39744) add 791f3cfc5c Fix acknowledged functionality in deferrable mode for PubSubPullSensor (#39711) add a78ee74b6a bugfix: handle invalid cluster states in NeptuneStopDbClusterOperator (#38287) add 60f8ff65d3 remove deprecation jdbc (#39733) add cee446467f remove deprecati
(airflow) branch main updated (5f2ebb312b -> aba8def5f3)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 5f2ebb312b ECS Overrides for AWS Batch submit_job (#39903) add aba8def5f3 Add error stacktrace to OpenLineage task event (#39813) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/plugins/adapter.py | 12 ++- airflow/providers/openlineage/plugins/listener.py | 66 + .../providers/openlineage/plugins/test_listener.py | 109 - 3 files changed, 101 insertions(+), 86 deletions(-)
(airflow) branch main updated: Use UUIDv7 for OpenLineage runIds (#39889)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 22305477bb Use UUIDv7 for OpenLineage runIds (#39889) 22305477bb is described below commit 22305477bb056cb7a77af59f4dc906ff8a20583d Author: Maxim Martynov AuthorDate: Tue May 28 14:09:05 2024 +0300 Use UUIDv7 for OpenLineage runIds (#39889) --- airflow/providers/openlineage/plugins/adapter.py | 43 ++-- airflow/providers/openlineage/plugins/listener.py | 21 ++-- airflow/providers/openlineage/plugins/macros.py| 2 +- airflow/providers/openlineage/provider.yaml| 4 +- generated/provider_dependencies.json | 4 +- .../providers/openlineage/plugins/test_adapter.py | 110 ++--- .../providers/openlineage/plugins/test_listener.py | 53 +- tests/providers/openlineage/plugins/test_macros.py | 25 ++--- 8 files changed, 170 insertions(+), 92 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index e449668ef3..5a5b8ed34b 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -17,7 +17,6 @@ from __future__ import annotations import traceback -import uuid from contextlib import ExitStack from typing import TYPE_CHECKING @@ -36,6 +35,7 @@ from openlineage.client.facet import ( SourceCodeLocationJobFacet, ) from openlineage.client.run import Job, Run, RunEvent, RunState +from openlineage.client.uuid import generate_static_uuid from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf from airflow.providers.openlineage.utils.utils import OpenLineageRedactor @@ -43,6 +43,8 @@ from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: +from datetime import datetime + from airflow.models.dagrun import DagRun from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.log.secrets_masker import SecretsMasker @@ -111,15 +113,25 @@ class OpenLineageAdapter(LoggingMixin): return yaml.safe_load(config_file) @staticmethod -def build_dag_run_id(dag_id, dag_run_id): -return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{conf.namespace()}.{dag_id}.{dag_run_id}")) +def build_dag_run_id(dag_id: str, execution_date: datetime) -> str: +return str( +generate_static_uuid( +instant=execution_date, +data=f"{conf.namespace()}.{dag_id}".encode(), +) +) @staticmethod -def build_task_instance_run_id(dag_id, task_id, execution_date, try_number): +def build_task_instance_run_id( +dag_id: str, +task_id: str, +try_number: int, +execution_date: datetime, +): return str( -uuid.uuid3( -uuid.NAMESPACE_URL, - f"{conf.namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}", +generate_static_uuid( +instant=execution_date, + data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(), ) ) @@ -306,7 +318,10 @@ class OpenLineageAdapter(LoggingMixin): eventTime=dag_run.start_date.isoformat(), job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG), run=self._build_run( -run_id=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id), +run_id=self.build_dag_run_id( +dag_id=dag_run.dag_id, +execution_date=dag_run.execution_date, +), job_name=dag_run.dag_id, nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, @@ -328,7 +343,12 @@ class OpenLineageAdapter(LoggingMixin): eventType=RunState.COMPLETE, eventTime=dag_run.end_date.isoformat(), job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG), -run=Run(runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id)), +run=Run( +runId=self.build_dag_run_id( +dag_id=dag_run.dag_id, +execution_date=dag_run.execution_date, +), +), inputs=[], outputs=[], producer=_PRODUCER, @@ -347,7 +367,10 @@ class OpenLineageAdapter(LoggingMixin): eventTime=dag_run.end_date.isoformat(), job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
(airflow) 01/01: local task job: add timeout, to not kill on_task_instance_success listener prematurely
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git commit 460fadf1b97c645c00ef38a7877d49f6a643ce13 Author: Maciej Obuchowski AuthorDate: Fri May 24 17:17:01 2024 +0200 local task job: add timeout, to not kill on_task_instance_success listener prematurely Signed-off-by: Maciej Obuchowski --- airflow/config_templates/config.yml | 7 + airflow/jobs/local_task_job_runner.py | 13 +- airflow/providers/openlineage/plugins/listener.py | 1 - tests/dags/test_mark_state.py | 15 +++ tests/jobs/test_local_task_job.py | 150 +- tests/listeners/slow_listener.py | 26 tests/listeners/very_slow_listener.py | 26 7 files changed, 235 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 95d83f9d4c..b71414f6ae 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -329,6 +329,13 @@ core: type: string example: ~ default: "downstream" +task_listener_timeout: + description: | +Maximum possible time (in seconds) that task listener will have for their execution. + version_added: 2.10.0 + type: integer + example: ~ + default: "20" default_task_execution_timeout: description: | The default task execution_timeout value for the operators. Expected an integer value to diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index bb520825f2..c26f6735b4 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -26,6 +26,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import perform_heartbeat +from airflow.listeners.listener import get_listener_manager from airflow.models.taskinstance import TaskReturnCode from airflow.stats import Stats from airflow.utils import timezone @@ -110,6 +111,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.terminating = False self._state_change_checks = 0 +# time spend after task completed, but before it exited - used to measure listener execution time +self._overtime = 0.0 def _execute(self) -> int | None: from airflow.task.task_runner import get_task_runner @@ -195,7 +198,6 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, ), ) - return_code = self.task_runner.return_code(timeout=max_wait_time) if return_code is not None: self.handle_task_exit(return_code) @@ -290,6 +292,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): ) raise AirflowException("PID of job runner does not match") elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"): +self._overtime = (timezone.utcnow() - (ti.end_date or timezone.utcnow())).total_seconds() if ti.state == TaskInstanceState.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. dagrun = ti.get_dagrun(session=session) @@ -303,6 +306,14 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): if dagrun_timeout and execution_time > dagrun_timeout: self.log.warning("DagRun timed out after %s.", execution_time) +# If process still runs after being marked as success, let it run until configured overtime +# if there are configured listeners +if ( +ti.state == TaskInstanceState.SUCCESS +and self._overtime < conf.getint("core", "task_listener_timeout") +and get_listener_manager().has_listeners +): +return # potential race condition, the _run_raw_task commits `success` or other state # but task_runner does not exit right away due to slow process shutdown or any other reasons # let's do a throttle here, if the above case is true, the handle_task_exit will handle it diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 73f8c8c79e..3df5f36d6c 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -130,7 +130,6 @@ class OpenLineageListener:
(airflow) branch listener-task-timeout created (now 460fadf1b9)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch listener-task-timeout in repository https://gitbox.apache.org/repos/asf/airflow.git at 460fadf1b9 local task job: add timeout, to not kill on_task_instance_success listener prematurely This branch includes the following new commits: new 460fadf1b9 local task job: add timeout, to not kill on_task_instance_success listener prematurely The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) branch main updated: typo: wrong OpenLineage facet key in spec (#39782)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 57dab1303c typo: wrong OpenLineage facet key in spec (#39782) 57dab1303c is described below commit 57dab1303c5850b9b2ff81e64cbbe5bd4d1032f9 Author: Kacper Muda AuthorDate: Thu May 23 15:53:07 2024 +0200 typo: wrong OpenLineage facet key in spec (#39782) Signed-off-by: Kacper Muda --- .../providers/google/cloud/openlineage/BigQueryErrorRunFacet.json | 2 +- airflow/providers/google/cloud/openlineage/utils.py | 6 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json b/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json index f85756ee92..3213f9b8b2 100644 --- a/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json +++ b/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json @@ -23,7 +23,7 @@ }, "type": "object", "properties": { - "bigQueryJob": { + "bigQuery_error": { "$ref": "#/$defs/BigQueryErrorRunFacet" } } diff --git a/airflow/providers/google/cloud/openlineage/utils.py b/airflow/providers/google/cloud/openlineage/utils.py index 9f18aac9f4..fb0d4c663b 100644 --- a/airflow/providers/google/cloud/openlineage/utils.py +++ b/airflow/providers/google/cloud/openlineage/utils.py @@ -269,7 +269,7 @@ class _BigQueryOpenLineageMixin: if hasattr(self, "log"): self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) exception_msg = traceback.format_exc() -# TODO: remove ErrorMessageRunFacet in next release +# TODO: remove BigQueryErrorRunFacet in next release run_facets.update( { "errorMessage": ErrorMessageRunFacet( @@ -282,10 +282,6 @@ class _BigQueryOpenLineageMixin: } ) deduplicated_outputs = self._deduplicate_outputs(outputs) -# For complex scripts there can be multiple outputs - in that case keep them all in `outputs` and -# leave the `output` empty to avoid providing misleading information. When the script has a single -# output (f.e. a single statement with some variable declarations), treat it as a regular non-script -# job and put the output in `output` as an addition to new `outputs`. `output` is deprecated. return inputs, deduplicated_outputs, run_facets def _deduplicate_outputs(self, outputs: list[Dataset | None]) -> list[Dataset]:
(airflow) 02/02: Reapply "openlineage, snowflake: do not run external queries for Snowflake"
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git commit 098415ebcc2ed458dc4f6950bf3277f09376953e Author: Maciej Obuchowski AuthorDate: Wed May 22 17:54:38 2024 +0200 Reapply "openlineage, snowflake: do not run external queries for Snowflake" This reverts commit c62111d2b742ae1feff5fd80e0e9414065166703. --- airflow/providers/common/sql/operators/sql.py | 9 +++ airflow/providers/openlineage/sqlparser.py | 61 +++ airflow/providers/openlineage/utils/utils.py | 5 ++ airflow/providers/snowflake/hooks/snowflake.py | 24 ++-- .../providers/snowflake/hooks/snowflake_sql_api.py | 8 +-- .../amazon/aws/operators/test_redshift_sql.py | 6 +- tests/providers/snowflake/hooks/test_snowflake.py | 37 +--- .../snowflake/hooks/test_snowflake_sql_api.py | 32 +++--- .../snowflake/operators/test_snowflake_sql.py | 69 -- 9 files changed, 128 insertions(+), 123 deletions(-) diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index 1fd22b86b7..ea791992d5 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -309,6 +309,14 @@ class SQLExecuteQueryOperator(BaseSQLOperator): hook = self.get_db_hook() +try: +from airflow.providers.openlineage.utils.utils import should_use_external_connection + +use_external_connection = should_use_external_connection(hook) +except ImportError: +# OpenLineage provider release < 1.8.0 - we always use connection +use_external_connection = True + connection = hook.get_connection(getattr(hook, hook.conn_name_attr)) try: database_info = hook.get_openlineage_database_info(connection) @@ -334,6 +342,7 @@ class SQLExecuteQueryOperator(BaseSQLOperator): database_info=database_info, database=self.database, sqlalchemy_engine=hook.get_sqlalchemy_engine(), +use_connection=use_external_connection, ) return operator_lineage diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index c27dedc53c..f181ff8cce 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -29,6 +29,7 @@ from openlineage.client.facet import ( ExtractionErrorRunFacet, SqlJobFacet, ) +from openlineage.client.run import Dataset from openlineage.common.sql import DbTableMeta, SqlMeta, parse from airflow.providers.openlineage.extractors.base import OperatorLineage @@ -40,7 +41,6 @@ from airflow.providers.openlineage.utils.sql import ( from airflow.typing_compat import TypedDict if TYPE_CHECKING: -from openlineage.client.run import Dataset from sqlalchemy.engine import Engine from airflow.hooks.base import BaseHook @@ -104,6 +104,18 @@ class DatabaseInfo: normalize_name_method: Callable[[str], str] = default_normalize_name_method +def from_table_meta( +table_meta: DbTableMeta, database: str | None, namespace: str, is_uppercase: bool +) -> Dataset: +if table_meta.database: +name = table_meta.qualified_name +elif database: +name = f"{database}.{table_meta.schema}.{table_meta.name}" +else: +name = f"{table_meta.schema}.{table_meta.name}" +return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) + + class SQLParser: """Interface for openlineage-sql. @@ -117,7 +129,7 @@ class SQLParser: def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" -return parse(sql=sql, dialect=self.dialect) +return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( self, @@ -156,6 +168,23 @@ class SQLParser: else None, ) +def get_metadata_from_parser( +self, +inputs: list[DbTableMeta], +outputs: list[DbTableMeta], +database_info: DatabaseInfo, +namespace: str = DEFAULT_NAMESPACE, +database: str | None = None, +) -> tuple[list[Dataset], ...]: +database = database if database else database_info.database +return [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in inputs +], [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in outputs +] + def attach_column_lineage( self, datasets: list[Dataset], database:
(airflow) 01/02: fork openlineage
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git commit d2340c362d88ce6bebde2a962d1f93228e50b0c5 Author: Maciej Obuchowski AuthorDate: Tue May 21 15:44:21 2024 +0200 fork openlineage Signed-off-by: Maciej Obuchowski --- airflow/config_templates/config.yml | 7 ++ airflow/jobs/local_task_job_runner.py | 19 +++-- airflow/models/taskinstance.py| 2 + airflow/providers/openlineage/plugins/listener.py | 86 +++ 4 files changed, 81 insertions(+), 33 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 95d83f9d4c..d23cf2612b 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -329,6 +329,13 @@ core: type: string example: ~ default: "downstream" +task_listener_timeout: + description: | +Maximum possible time (in seconds) that task listener will have for their execution. + version_added: 2.10.0 + type: integer + example: ~ + default: "10" default_task_execution_timeout: description: | The default task execution_timeout value for the operators. Expected an integer value to diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index bb520825f2..ccd6622ba6 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -110,6 +110,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.terminating = False self._state_change_checks = 0 +# time spend after task completed, but before it exited - used to measure listener execution time +self._overtime = 0.0 def _execute(self) -> int | None: from airflow.task.task_runner import get_task_runner @@ -195,7 +197,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, ), ) - +self.log.error("WAITING TIME %f", max_wait_time) return_code = self.task_runner.return_code(timeout=max_wait_time) if return_code is not None: self.handle_task_exit(return_code) @@ -251,6 +253,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): def heartbeat_callback(self, session: Session = NEW_SESSION) -> None: """Self destruct task if state has been moved away from running externally.""" if self.terminating: +self.log.error("TERMINATING") # ensure termination if processes are created later self.task_runner.terminate() return @@ -290,6 +293,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): ) raise AirflowException("PID of job runner does not match") elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"): +self._overtime = (timezone.utcnow() - (ti.end_date or timezone.utcnow())).total_seconds() +self.log.error("checking process code? overtime %f", self._overtime) if ti.state == TaskInstanceState.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. dagrun = ti.get_dagrun(session=session) @@ -304,13 +309,19 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.log.warning("DagRun timed out after %s.", execution_time) # potential race condition, the _run_raw_task commits `success` or other state -# but task_runner does not exit right away due to slow process shutdown or any other reasons -# let's do a throttle here, if the above case is true, the handle_task_exit will handle it -if self._state_change_checks >= 1: # defer to next round of heartbeat +# but task_runner does not exit right away due to slow process shutdown, listener execution +# or any other reasons - let's do a throttle here, if the above case is true, the +# handle_task_exit will handle it +if self._state_change_checks >= 1 and self._overtime > conf.getint( +"core", "task_listener_timeout" +): +self.log.warning("Overtime: %f", self._overtime) self.log.warning( "State of this instance has been externally set to %s. Terminating instance.", ti.state ) self.terminating = True +else: +self.lo
(airflow) branch openlineage-processexecution updated (b114fd6468 -> 098415ebcc)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git discard b114fd6468 fork openlineage new d2340c362d fork openlineage new 098415ebcc Reapply "openlineage, snowflake: do not run external queries for Snowflake" This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (b114fd6468) \ N -- N -- N refs/heads/openlineage-processexecution (098415ebcc) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: airflow/config_templates/config.yml| 7 +++ airflow/jobs/local_task_job_runner.py | 15 +++-- airflow/providers/common/sql/operators/sql.py | 9 +++ airflow/providers/openlineage/plugins/listener.py | 22 --- airflow/providers/openlineage/sqlparser.py | 61 +++ airflow/providers/openlineage/utils/utils.py | 5 ++ airflow/providers/snowflake/hooks/snowflake.py | 24 ++-- .../providers/snowflake/hooks/snowflake_sql_api.py | 8 +-- .../amazon/aws/operators/test_redshift_sql.py | 6 +- tests/providers/snowflake/hooks/test_snowflake.py | 37 +--- .../snowflake/hooks/test_snowflake_sql_api.py | 32 +++--- .../snowflake/operators/test_snowflake_sql.py | 69 -- 12 files changed, 160 insertions(+), 135 deletions(-)
(airflow) branch main updated (f18e6340d8 -> 9532cc7a6c)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from f18e6340d8 Better typing for BaseOperator `defer` (#39742) add 9532cc7a6c fix: Prevent error when extractor can't be imported (#39736) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/extractors/manager.py | 21 +++-- airflow/providers/openlineage/utils/utils.py| 6 ++ 2 files changed, 17 insertions(+), 10 deletions(-)
(airflow) 03/03: fork openlineage
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git commit b114fd646800da1b3b149d8e926d294b1bd216f6 Author: Maciej Obuchowski AuthorDate: Tue May 21 15:44:21 2024 +0200 fork openlineage Signed-off-by: Maciej Obuchowski --- airflow/jobs/local_task_job_runner.py | 6 +- airflow/models/taskinstance.py| 2 + airflow/providers/openlineage/plugins/listener.py | 80 +++ 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index bb520825f2..09904fad98 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -195,7 +195,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, ), ) - +self.log.error("WAITING TIME %f", max_wait_time) return_code = self.task_runner.return_code(timeout=max_wait_time) if return_code is not None: self.handle_task_exit(return_code) @@ -251,6 +251,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): def heartbeat_callback(self, session: Session = NEW_SESSION) -> None: """Self destruct task if state has been moved away from running externally.""" if self.terminating: +self.log.error("TERMINATING") # ensure termination if processes are created later self.task_runner.terminate() return @@ -290,6 +291,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): ) raise AirflowException("PID of job runner does not match") elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"): +self.log.error("checking process code?") if ti.state == TaskInstanceState.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. dagrun = ti.get_dagrun(session=session) @@ -311,6 +313,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): "State of this instance has been externally set to %s. Terminating instance.", ti.state ) self.terminating = True +else: +self.log.error("still checking - will kill on next call") self._state_change_checks += 1 def _log_return_code_metric(self, return_code: int): diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index a27579b05e..0dd5cb3175 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2525,10 +2525,12 @@ class TaskInstance(Base, LoggingMixin): except (AirflowFailException, AirflowSensorTimeout) as e: # If AirflowFailException is raised, task should not retry. # If a sensor in reschedule mode reaches timeout, task should not retry. +self.log.exception("FAIL?") self.handle_failure(e, test_mode, context, force_fail=True, session=session) session.commit() raise except (AirflowTaskTimeout, AirflowException, AirflowTaskTerminated) as e: +self.log.exception("TIMEOUT? WUT?") if not test_mode: self.refresh_from_db(lock_for_update=True, session=session) # for case when task is marked as success/failed externally diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index ef736308f7..5984cb957a 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -18,6 +18,7 @@ from __future__ import annotations import logging import os +import time from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING @@ -26,7 +27,6 @@ import psutil from openlineage.client.serde import Serde from airflow import __version__ as airflow_version -from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.listeners import hookimpl from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager @@ -133,7 +133,9 @@ class OpenLineageListener: dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None ) data_interval_end = dagrun.data_interval_end.isoformat() if dagrun.data_interval_end else None - +
(airflow) 01/03: openlineage: execute extraction and message sending in separate process
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git commit 80e6d94836f85f6da4a0df98f3a781b8560f8566 Author: Maciej Obuchowski AuthorDate: Wed May 15 16:09:46 2024 +0200 openlineage: execute extraction and message sending in separate process Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/plugins/listener.py | 50 +-- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 73f8c8c79e..ef736308f7 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,13 +17,16 @@ from __future__ import annotations import logging +import os from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING +import psutil from openlineage.client.serde import Serde from airflow import __version__ as airflow_version +from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.listeners import hookimpl from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager @@ -71,7 +74,7 @@ class OpenLineageListener: self, previous_state, task_instance: TaskInstance, -session: Session, # This will always be QUEUED +session: Session, ): if not getattr(task_instance, "task", None) is not None: self.log.warning( @@ -153,7 +156,20 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_running() +pid = os.fork() +if pid: +process = psutil.Process(pid) +process.wait(5) +else: +if not InternalApiConfig.get_use_internal_api(): +# Force a new SQLAlchemy session. We can't share open DB handles +# between process. The cli code will re-create this as part of its +# normal startup +from airflow import settings + +settings.engine.pool.dispose() +settings.engine.dispose() +on_running() @hookimpl def on_task_instance_success(self, previous_state, task_instance: TaskInstance, session): @@ -215,7 +231,20 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_success() +pid = os.fork() +if pid: +process = psutil.Process(pid) +process.wait(5) +else: +if not InternalApiConfig.get_use_internal_api(): +# Force a new SQLAlchemy session. We can't share open DB handles +# between process. The cli code will re-create this as part of its +# normal startup +from airflow import settings + +settings.engine.pool.dispose() +settings.engine.dispose() +on_success() @hookimpl def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, session): @@ -277,7 +306,20 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) -on_failure() +pid = os.fork() +if pid: +process = psutil.Process(pid) +process.wait(5) +else: +if not InternalApiConfig.get_use_internal_api(): +# Force a new SQLAlchemy session. We can't share open DB handles +# between process. The cli code will re-create this as part of its +# normal startup +from airflow import settings + +settings.engine.pool.dispose() +settings.engine.dispose() +on_failure() @property def executor(self):
(airflow) 02/03: Revert "openlineage, snowflake: do not run external queries for Snowflake"
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git commit c62111d2b742ae1feff5fd80e0e9414065166703 Author: Maciej Obuchowski AuthorDate: Wed May 15 16:12:13 2024 +0200 Revert "openlineage, snowflake: do not run external queries for Snowflake" This reverts commit 3b26f586a7db17be226d59be522c4cf7bbfda552. --- airflow/providers/common/sql/operators/sql.py | 9 --- airflow/providers/openlineage/sqlparser.py | 61 --- airflow/providers/openlineage/utils/utils.py | 5 -- airflow/providers/snowflake/hooks/snowflake.py | 24 ++-- .../providers/snowflake/hooks/snowflake_sql_api.py | 8 +-- .../amazon/aws/operators/test_redshift_sql.py | 6 +- tests/providers/snowflake/hooks/test_snowflake.py | 37 +++- .../snowflake/hooks/test_snowflake_sql_api.py | 32 +++--- .../snowflake/operators/test_snowflake_sql.py | 69 ++ 9 files changed, 123 insertions(+), 128 deletions(-) diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index ea791992d5..1fd22b86b7 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -309,14 +309,6 @@ class SQLExecuteQueryOperator(BaseSQLOperator): hook = self.get_db_hook() -try: -from airflow.providers.openlineage.utils.utils import should_use_external_connection - -use_external_connection = should_use_external_connection(hook) -except ImportError: -# OpenLineage provider release < 1.8.0 - we always use connection -use_external_connection = True - connection = hook.get_connection(getattr(hook, hook.conn_name_attr)) try: database_info = hook.get_openlineage_database_info(connection) @@ -342,7 +334,6 @@ class SQLExecuteQueryOperator(BaseSQLOperator): database_info=database_info, database=self.database, sqlalchemy_engine=hook.get_sqlalchemy_engine(), -use_connection=use_external_connection, ) return operator_lineage diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index f181ff8cce..c27dedc53c 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -29,7 +29,6 @@ from openlineage.client.facet import ( ExtractionErrorRunFacet, SqlJobFacet, ) -from openlineage.client.run import Dataset from openlineage.common.sql import DbTableMeta, SqlMeta, parse from airflow.providers.openlineage.extractors.base import OperatorLineage @@ -41,6 +40,7 @@ from airflow.providers.openlineage.utils.sql import ( from airflow.typing_compat import TypedDict if TYPE_CHECKING: +from openlineage.client.run import Dataset from sqlalchemy.engine import Engine from airflow.hooks.base import BaseHook @@ -104,18 +104,6 @@ class DatabaseInfo: normalize_name_method: Callable[[str], str] = default_normalize_name_method -def from_table_meta( -table_meta: DbTableMeta, database: str | None, namespace: str, is_uppercase: bool -) -> Dataset: -if table_meta.database: -name = table_meta.qualified_name -elif database: -name = f"{database}.{table_meta.schema}.{table_meta.name}" -else: -name = f"{table_meta.schema}.{table_meta.name}" -return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) - - class SQLParser: """Interface for openlineage-sql. @@ -129,7 +117,7 @@ class SQLParser: def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" -return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) +return parse(sql=sql, dialect=self.dialect) def parse_table_schemas( self, @@ -168,23 +156,6 @@ class SQLParser: else None, ) -def get_metadata_from_parser( -self, -inputs: list[DbTableMeta], -outputs: list[DbTableMeta], -database_info: DatabaseInfo, -namespace: str = DEFAULT_NAMESPACE, -database: str | None = None, -) -> tuple[list[Dataset], ...]: -database = database if database else database_info.database -return [ -from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) -for dataset in inputs -], [ -from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) -for dataset in outputs -] - def attach_column_lineage( self, datasets: list[Dataset], database:
(airflow) branch openlineage-processexecution created (now b114fd6468)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git at b114fd6468 fork openlineage This branch includes the following new commits: new 80e6d94836 openlineage: execute extraction and message sending in separate process new c62111d2b7 Revert "openlineage, snowflake: do not run external queries for Snowflake" new b114fd6468 fork openlineage The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) branch main updated (a81504e316 -> b7671ef5ab)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from a81504e316 chore: Update conf retrieval docstring and adjust pool_size (#39721) add b7671ef5ab Re-configure ORM in spawned OpenLineage process in scheduler. (#39735) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/plugins/adapter.py | 95 +- airflow/providers/openlineage/plugins/listener.py | 12 ++- .../providers/openlineage/plugins/test_listener.py | 2 +- 3 files changed, 68 insertions(+), 41 deletions(-)
(airflow) branch main updated (f509b0a924 -> a81504e316)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from f509b0a924 More typing in TimeSensor and TimeSensorAsync (#39696) add a81504e316 chore: Update conf retrieval docstring and adjust pool_size (#39721) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/conf.py| 24 -- tests/providers/openlineage/test_conf.py | 56 2 files changed, 78 insertions(+), 2 deletions(-)
(airflow) branch main updated (cc9308c710 -> 4ee46b984d)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from cc9308c710 Reraise exception from strict dataset URI checks (#39719) add 4ee46b984d Remove `openlineage.common` dependencies in Google and Snowflake providers. (#39614) No new revisions were added by this update. Summary of changes: .../cloud/openlineage/BigQueryErrorRunFacet.json | 30 ++ .../cloud/openlineage/BigQueryJobRunFacet.json | 37 ++ .../google/cloud/openlineage}/__init__.py | 0 .../providers/google/cloud/openlineage/utils.py| 392 + .../providers/google/cloud/operators/bigquery.py | 63 +--- .../google/cloud/transfers/bigquery_to_gcs.py | 2 +- .../google/cloud/transfers/gcs_to_bigquery.py | 2 +- .../providers/google/cloud/utils/openlineage.py| 81 - airflow/providers/openlineage/utils/utils.py | 11 +- airflow/providers/snowflake/hooks/snowflake.py | 5 +- airflow/providers/snowflake/utils/openlineage.py | 63 .../google/cloud/openlineage}/__init__.py | 0 .../google/cloud/openlineage/test_utils.py | 369 +++ .../google/cloud/operators/test_bigquery.py| 15 +- .../cloud/{operators => utils}/job_details.json| 0 .../google/cloud/utils/out_table_details.json | 30 ++ .../google/cloud/utils/script_job_details.json | 36 ++ .../google/cloud/utils/table_details.json | 53 +++ .../google/cloud/utils/test_openlineage.py | 142 tests/providers/openlineage/utils/test_sql.py | 11 - .../providers/snowflake/utils/test_openlineage.py | 62 21 files changed, 1093 insertions(+), 311 deletions(-) create mode 100644 airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json create mode 100644 airflow/providers/google/cloud/openlineage/BigQueryJobRunFacet.json copy airflow/{api_connexion => providers/google/cloud/openlineage}/__init__.py (100%) create mode 100644 airflow/providers/google/cloud/openlineage/utils.py delete mode 100644 airflow/providers/google/cloud/utils/openlineage.py create mode 100644 airflow/providers/snowflake/utils/openlineage.py copy {airflow/api_connexion => tests/providers/google/cloud/openlineage}/__init__.py (100%) create mode 100644 tests/providers/google/cloud/openlineage/test_utils.py rename tests/providers/google/cloud/{operators => utils}/job_details.json (100%) create mode 100644 tests/providers/google/cloud/utils/out_table_details.json create mode 100644 tests/providers/google/cloud/utils/script_job_details.json create mode 100644 tests/providers/google/cloud/utils/table_details.json delete mode 100644 tests/providers/google/cloud/utils/test_openlineage.py create mode 100644 tests/providers/snowflake/utils/test_openlineage.py
(airflow) branch main updated: fix: empty openlineage dataset name for AthenaExtractor (#39677)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 8d1bd345b2 fix: empty openlineage dataset name for AthenaExtractor (#39677) 8d1bd345b2 is described below commit 8d1bd345b2f343c9abe83e6dc5a71604796f0085 Author: Kacper Muda AuthorDate: Fri May 17 11:51:32 2024 +0200 fix: empty openlineage dataset name for AthenaExtractor (#39677) Signed-off-by: Kacper Muda --- airflow/providers/amazon/aws/operators/athena.py| 2 +- tests/providers/amazon/aws/operators/test_athena.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index a9e09fb325..48b4a8254c 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -266,7 +266,7 @@ class AthenaOperator(AwsBaseOperator[AthenaHook]): if self.output_location: parsed = urlparse(self.output_location) - outputs.append(Dataset(namespace=f"{parsed.scheme}://{parsed.netloc}", name=parsed.path)) + outputs.append(Dataset(namespace=f"{parsed.scheme}://{parsed.netloc}", name=parsed.path or "/")) return OperatorLineage(job_facets=job_facets, run_facets=run_facets, inputs=inputs, outputs=outputs) diff --git a/tests/providers/amazon/aws/operators/test_athena.py b/tests/providers/amazon/aws/operators/test_athena.py index d4ccf521a7..66fb6b297f 100644 --- a/tests/providers/amazon/aws/operators/test_athena.py +++ b/tests/providers/amazon/aws/operators/test_athena.py @@ -279,7 +279,7 @@ class TestAthenaOperator: task_id="test_athena_openlineage", query="INSERT INTO TEST_TABLE SELECT CUSTOMER_EMAIL FROM DISCOUNTS", database="TEST_DATABASE", -output_location="s3://test_s3_bucket/", +output_location="s3://test_s3_bucket", client_request_token="eac427d0-1c6d-4dfb-96aa-2835d3ac6595", sleep_time=0, max_polling_attempts=3,
(airflow) branch main updated: Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. (#39235)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new d529ec8d45 Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. (#39235) d529ec8d45 is described below commit d529ec8d4572b4b9e97344974b2aa960c8a90ae6 Author: Jakub Dardzinski AuthorDate: Wed May 15 10:24:00 2024 +0200 Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. (#39235) Make `max_workers` configurable. Signed-off-by: Jakub Dardzinski --- airflow/providers/openlineage/conf.py | 7 +++ airflow/providers/openlineage/plugins/listener.py | 5 +++-- .../providers/openlineage/plugins/test_listener.py | 21 + 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index d43806abca..23e663f67e 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -104,3 +104,10 @@ def is_disabled() -> bool: # Check if both 'transport' and 'config_path' are not present and also # if legacy 'OPENLINEAGE_URL' environment variables is not set return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == "" + + +@cache +def dag_state_change_process_pool_size() -> int: +"""[openlineage] dag_state_change_process_pool_size.""" +option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1) +return option diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 9067d53f69..73f8c8c79e 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,7 +17,7 @@ from __future__ import annotations import logging -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING @@ -25,6 +25,7 @@ from openlineage.client.serde import Serde from airflow import __version__ as airflow_version from airflow.listeners import hookimpl +from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( @@ -281,7 +282,7 @@ class OpenLineageListener: @property def executor(self): if not self._executor: -self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") +self._executor = ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size()) return self._executor @hookimpl diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index fa651de1b2..d9fbb0dfd3 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -526,6 +526,27 @@ def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope listener.adapter.complete_task.assert_not_called() +@pytest.mark.parametrize( +"max_workers,expected", +[ +(None, 1), +("8", 8), +], +) +@mock.patch("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor", autospec=True) +def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor, max_workers, expected): +"""mock ProcessPoolExecutor and check if conf.dag_state_change_process_pool_size is applied to max_workers""" +listener = OpenLineageListener() +# mock ProcessPoolExecutor class +try: +with conf_vars({("openlineage", "dag_state_change_process_pool_size"): max_workers}): +listener.on_dag_run_running(mock.MagicMock(), None) +mock_executor.assert_called_once_with(max_workers=expected) +mock_executor.return_value.submit.assert_called_once() +finally: +conf.dag_state_change_process_pool_size.cache_clear() + + class TestOpenLineageSelectiveEnable: def setup_method(self): self.dag = DAG(
(airflow) branch main updated: chore: Add more OpenLineage logs to facilitate debugging (#39136)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 8bc6c32366 chore: Add more OpenLineage logs to facilitate debugging (#39136) 8bc6c32366 is described below commit 8bc6c32366e723c897c0c4be3b3026c61314b519 Author: Kacper Muda AuthorDate: Mon May 13 18:54:48 2024 +0200 chore: Add more OpenLineage logs to facilitate debugging (#39136) Signed-off-by: Kacper Muda --- airflow/providers/openlineage/conf.py | 17 +++--- airflow/providers/openlineage/extractors/base.py | 7 +++ airflow/providers/openlineage/extractors/bash.py | 4 ++ .../providers/openlineage/extractors/manager.py| 8 ++- airflow/providers/openlineage/extractors/python.py | 5 ++ airflow/providers/openlineage/plugins/adapter.py | 17 +- airflow/providers/openlineage/plugins/listener.py | 59 -- airflow/providers/openlineage/provider.yaml| 2 +- tests/providers/openlineage/test_conf.py | 69 ++ 9 files changed, 172 insertions(+), 16 deletions(-) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index 4ca42eedfd..d43806abca 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -26,6 +26,10 @@ from airflow.configuration import conf _CONFIG_SECTION = "openlineage" +def _is_true(arg: Any) -> bool: +return str(arg).lower().strip() in ("true", "1", "t") + + @cache def config_path(check_legacy_env_var: bool = True) -> str: """[openlineage] config_path.""" @@ -41,7 +45,8 @@ def is_source_enabled() -> bool: option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="") if not option: option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "") -return option.lower() not in ("true", "1", "t") +# when disable_source_code is True, is_source_enabled() should be False +return not _is_true(option) @cache @@ -53,7 +58,9 @@ def disabled_operators() -> set[str]: @cache def selective_enable() -> bool: -return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False) +"""[openlineage] selective_enable.""" +option = conf.get(_CONFIG_SECTION, "selective_enable", fallback="") +return _is_true(option) @cache @@ -85,11 +92,7 @@ def transport() -> dict[str, Any]: @cache def is_disabled() -> bool: -"""[openlineage] disabled + some extra checks.""" - -def _is_true(val): -return str(val).lower().strip() in ("true", "1", "t") - +"""[openlineage] disabled + check if any configuration is present.""" option = conf.get(_CONFIG_SECTION, "disabled", fallback="") if _is_true(option): return True diff --git a/airflow/providers/openlineage/extractors/base.py b/airflow/providers/openlineage/extractors/base.py index 2f5f957b5b..6fa805b6fb 100644 --- a/airflow/providers/openlineage/extractors/base.py +++ b/airflow/providers/openlineage/extractors/base.py @@ -87,6 +87,9 @@ class DefaultExtractor(BaseExtractor): def _execute_extraction(self) -> OperatorLineage | None: # OpenLineage methods are optional - if there's no method, return None try: +self.log.debug( +"Trying to execute `get_openlineage_facets_on_start` for %s.", self.operator.task_type +) return self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start) # type: ignore except ImportError: self.log.error( @@ -105,9 +108,13 @@ class DefaultExtractor(BaseExtractor): if task_instance.state == TaskInstanceState.FAILED: on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None) if on_failed and callable(on_failed): +self.log.debug( +"Executing `get_openlineage_facets_on_failure` for %s.", self.operator.task_type +) return self._get_openlineage_facets(on_failed, task_instance) on_complete = getattr(self.operator, "get_openlineage_facets_on_complete", None) if on_complete and callable(on_complete): +self.log.debug("Executing `get_openlineage_facets_on_complete` for %s.", self.operator.task_type) return self._get_openlineage_facets(on_complete, task_instance) return self.extract() diff --git a/airflow/providers/ope
(airflow) branch main updated (58509f52a3 -> 02ce7f1f58)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 58509f52a3 Fix typo in deferrable docs (#39494) add 02ce7f1f58 openlineage: notify that logged exception was caught (#39493) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/utils/utils.py | 3 +++ 1 file changed, 3 insertions(+)
(airflow) 01/01: openlineage: notify that logged exception was caught
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch notify-exception-caught in repository https://gitbox.apache.org/repos/asf/airflow.git commit 0e81e140899e42f010049c912febdde93c178f62 Author: Maciej Obuchowski AuthorDate: Wed May 8 15:40:06 2024 +0200 openlineage: notify that logged exception was caught Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/utils/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index ad1f3b0951..b9cd385cc5 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -367,6 +367,9 @@ def print_warning(log): try: return f(*args, **kwargs) except Exception as e: +log.warning( +"Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events." +) log.warning(e) return wrapper
(airflow) branch notify-exception-caught created (now 0e81e14089)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch notify-exception-caught in repository https://gitbox.apache.org/repos/asf/airflow.git at 0e81e14089 openlineage: notify that logged exception was caught This branch includes the following new commits: new 0e81e14089 openlineage: notify that logged exception was caught The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) branch main updated: openlineage, snowflake: do not run external queries for Snowflake (#39113)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new ecc5af70eb openlineage, snowflake: do not run external queries for Snowflake (#39113) ecc5af70eb is described below commit ecc5af70ebd845c873f30fa7ef85790edbf3351c Author: Maciej Obuchowski AuthorDate: Mon Apr 22 15:05:10 2024 +0200 openlineage, snowflake: do not run external queries for Snowflake (#39113) Signed-off-by: Maciej Obuchowski --- airflow/providers/common/sql/operators/sql.py | 9 +++ airflow/providers/openlineage/sqlparser.py | 61 +++ airflow/providers/openlineage/utils/utils.py | 5 ++ airflow/providers/snowflake/hooks/snowflake.py | 24 ++-- .../providers/snowflake/hooks/snowflake_sql_api.py | 8 +-- .../amazon/aws/operators/test_redshift_sql.py | 6 +- tests/providers/snowflake/hooks/test_snowflake.py | 37 +--- .../snowflake/hooks/test_snowflake_sql_api.py | 32 +++--- .../snowflake/operators/test_snowflake_sql.py | 69 -- 9 files changed, 128 insertions(+), 123 deletions(-) diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index 1fd22b86b7..ea791992d5 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -309,6 +309,14 @@ class SQLExecuteQueryOperator(BaseSQLOperator): hook = self.get_db_hook() +try: +from airflow.providers.openlineage.utils.utils import should_use_external_connection + +use_external_connection = should_use_external_connection(hook) +except ImportError: +# OpenLineage provider release < 1.8.0 - we always use connection +use_external_connection = True + connection = hook.get_connection(getattr(hook, hook.conn_name_attr)) try: database_info = hook.get_openlineage_database_info(connection) @@ -334,6 +342,7 @@ class SQLExecuteQueryOperator(BaseSQLOperator): database_info=database_info, database=self.database, sqlalchemy_engine=hook.get_sqlalchemy_engine(), +use_connection=use_external_connection, ) return operator_lineage diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index c27dedc53c..f181ff8cce 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -29,6 +29,7 @@ from openlineage.client.facet import ( ExtractionErrorRunFacet, SqlJobFacet, ) +from openlineage.client.run import Dataset from openlineage.common.sql import DbTableMeta, SqlMeta, parse from airflow.providers.openlineage.extractors.base import OperatorLineage @@ -40,7 +41,6 @@ from airflow.providers.openlineage.utils.sql import ( from airflow.typing_compat import TypedDict if TYPE_CHECKING: -from openlineage.client.run import Dataset from sqlalchemy.engine import Engine from airflow.hooks.base import BaseHook @@ -104,6 +104,18 @@ class DatabaseInfo: normalize_name_method: Callable[[str], str] = default_normalize_name_method +def from_table_meta( +table_meta: DbTableMeta, database: str | None, namespace: str, is_uppercase: bool +) -> Dataset: +if table_meta.database: +name = table_meta.qualified_name +elif database: +name = f"{database}.{table_meta.schema}.{table_meta.name}" +else: +name = f"{table_meta.schema}.{table_meta.name}" +return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) + + class SQLParser: """Interface for openlineage-sql. @@ -117,7 +129,7 @@ class SQLParser: def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" -return parse(sql=sql, dialect=self.dialect) +return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( self, @@ -156,6 +168,23 @@ class SQLParser: else None, ) +def get_metadata_from_parser( +self, +inputs: list[DbTableMeta], +outputs: list[DbTableMeta], +database_info: DatabaseInfo, +namespace: str = DEFAULT_NAMESPACE, +database: str | None = None, +) -> tuple[list[Dataset], ...]: +database = database if database else database_info.database +return [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in inputs +], [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +
(airflow) branch snowflake-openlineage-dontuseexternalconnection updated (e94fb0809b -> 3b26f586a7)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch snowflake-openlineage-dontuseexternalconnection in repository https://gitbox.apache.org/repos/asf/airflow.git omit e94fb0809b openlineage, snowflake: do not run queries for Snowflake add 3b26f586a7 openlineage, snowflake: do not run external queries for Snowflake This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (e94fb0809b) \ N -- N -- N refs/heads/snowflake-openlineage-dontuseexternalconnection (3b26f586a7) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: tests/providers/amazon/aws/operators/test_redshift_sql.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-)
(airflow) branch main updated: fix: Use prefixes instead of all file paths for OpenLineage datasets in GCSDeleteObjectsOperator (#39059)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 17e60b0a2b fix: Use prefixes instead of all file paths for OpenLineage datasets in GCSDeleteObjectsOperator (#39059) 17e60b0a2b is described below commit 17e60b0a2b640a6974eeecca0765e600817cd097 Author: Kacper Muda AuthorDate: Thu Apr 18 17:12:48 2024 +0200 fix: Use prefixes instead of all file paths for OpenLineage datasets in GCSDeleteObjectsOperator (#39059) Signed-off-by: Kacper Muda --- airflow/providers/google/cloud/operators/gcs.py| 34 +++- tests/providers/google/cloud/operators/test_gcs.py | 61 +- 2 files changed, 58 insertions(+), 37 deletions(-) diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index 6c72378a43..f4c8ce932e 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -297,7 +297,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator): *, bucket_name: str, objects: list[str] | None = None, -prefix: str | None = None, +prefix: str | list[str] | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -309,12 +309,14 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator): self.impersonation_chain = impersonation_chain if objects is None and prefix is None: -err_message = "(Task {task_id}) Either object or prefix should be set. Both are None.".format( +err_message = "(Task {task_id}) Either objects or prefix should be set. Both are None.".format( **kwargs ) raise ValueError(err_message) +if objects is not None and prefix is not None: +err_message = "(Task {task_id}) Objects or prefix should be set. Both provided.".format(**kwargs) +raise ValueError(err_message) -self._objects: list[str] = [] super().__init__(**kwargs) def execute(self, context: Context) -> None: @@ -324,15 +326,14 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator): ) if self.objects is not None: -self._objects = self.objects +objects = self.objects else: -self._objects = hook.list(bucket_name=self.bucket_name, prefix=self.prefix) -self.log.info("Deleting %s objects from %s", len(self._objects), self.bucket_name) -for object_name in self._objects: +objects = hook.list(bucket_name=self.bucket_name, prefix=self.prefix) +self.log.info("Deleting %s objects from %s", len(objects), self.bucket_name) +for object_name in objects: hook.delete(bucket_name=self.bucket_name, object_name=object_name) -def get_openlineage_facets_on_complete(self, task_instance): -"""Implement on_complete as execute() resolves object names.""" +def get_openlineage_facets_on_start(self): from openlineage.client.facet import ( LifecycleStateChange, LifecycleStateChangeDatasetFacet, @@ -342,8 +343,17 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator): from airflow.providers.openlineage.extractors import OperatorLineage -if not self._objects: -return OperatorLineage() +objects = [] +if self.objects is not None: +objects = self.objects +elif self.prefix is not None: +prefixes = [self.prefix] if isinstance(self.prefix, str) else self.prefix +for pref in prefixes: +# Use parent if not a file (dot not in name) and not a dir (ends with slash) +if "." not in pref.split("/")[-1] and not pref.endswith("/"): +pref = Path(pref).parent.as_posix() +pref = "/" if pref in (".", "", "/") else pref.rstrip("/") +objects.append(pref) bucket_url = f"gs://{self.bucket_name}" input_datasets = [ @@ -360,7 +370,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator): ) }, ) -for object_name in self._objects +for object_name in objects ] return OperatorLineage(inputs=input_datasets) diff --git a/tests/providers/google/cloud/operators/test_gcs.py b/tests/providers/google/cloud/operators/test_gcs.py index 6236aa5f23..0024ad6407 100644 --- a/tests/providers/google/cloud/operators/te
(airflow) branch main updated: fix: Use prefixes instead of full file paths for OpenLineage datasets in GCSToGCSOperator (#39058)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 927e3643c2 fix: Use prefixes instead of full file paths for OpenLineage datasets in GCSToGCSOperator (#39058) 927e3643c2 is described below commit 927e3643c2f901a3ac85f8dc94541ba83b3c6755 Author: Kacper Muda AuthorDate: Thu Apr 18 16:11:14 2024 +0200 fix: Use prefixes instead of full file paths for OpenLineage datasets in GCSToGCSOperator (#39058) Signed-off-by: Kacper Muda --- .../google/cloud/transfers/gcs_to_bigquery.py | 8 +- .../providers/google/cloud/transfers/gcs_to_gcs.py | 34 ++-- .../google/cloud/transfers/test_gcs_to_gcs.py | 207 ++--- 3 files changed, 165 insertions(+), 84 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 03aefcb8ad..3899048dc4 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -749,7 +749,6 @@ class GCSToBigQueryOperator(BaseOperator): ) from openlineage.client.run import Dataset -from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url from airflow.providers.google.cloud.utils.openlineage import ( get_facets_from_bq_table, get_identity_column_lineage_facet, @@ -766,8 +765,7 @@ class GCSToBigQueryOperator(BaseOperator): "schema": output_dataset_facets["schema"], } input_datasets = [] -for uri in sorted(self.source_uris): -bucket, blob = _parse_gcs_url(uri) +for blob in sorted(self.source_objects): additional_facets = {} if "*" in blob: @@ -777,7 +775,7 @@ class GCSToBigQueryOperator(BaseOperator): "symlink": SymlinksDatasetFacet( identifiers=[ SymlinksDatasetFacetIdentifiers( -namespace=f"gs://{bucket}", name=blob, type="file" +namespace=f"gs://{self.bucket}", name=blob, type="file" ) ] ), @@ -788,7 +786,7 @@ class GCSToBigQueryOperator(BaseOperator): blob = "/" dataset = Dataset( -namespace=f"gs://{bucket}", +namespace=f"gs://{self.bucket}", name=blob, facets=merge_dicts(input_dataset_facets, additional_facets), ) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 8d07dca58b..0b3d330b65 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -234,8 +234,6 @@ class GCSToGCSOperator(BaseOperator): self.source_object_required = source_object_required self.exact_match = exact_match self.match_glob = match_glob -self.resolved_source_objects: set[str] = set() -self.resolved_target_objects: set[str] = set() def execute(self, context: Context): hook = GCSHook( @@ -540,13 +538,6 @@ class GCSToGCSOperator(BaseOperator): self.destination_bucket, destination_object, ) - -self.resolved_source_objects.add(source_object) -if not destination_object: -self.resolved_target_objects.add(source_object) -else: -self.resolved_target_objects.add(destination_object) - hook.rewrite(self.source_bucket, source_object, self.destination_bucket, destination_object) if self.move_object: @@ -559,17 +550,36 @@ class GCSToGCSOperator(BaseOperator): This means we won't have to normalize self.source_object and self.source_objects, destination bucket and so on. """ +from pathlib import Path + from openlineage.client.run import Dataset from airflow.providers.openlineage.extractors import OperatorLineage +def _process_prefix(pref): +if WILDCARD in pref: +pref = pref.split(WILDCARD)[0] +# Use parent if not a file (dot not in name) and not a dir (ends with slash) +if "." not in pref.split("/")[-1] and not pref.endswith("/"): +pref = Path(pref).parent.as_posix() +return ["/" if pref in ("", "/", ".") else pref.rstrip("/")] # Adjust root path + +inputs = [] +for pref
(airflow) branch snowflake-openlineage-dontuseexternalconnection updated (f1fa905b4f -> e94fb0809b)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch snowflake-openlineage-dontuseexternalconnection in repository https://gitbox.apache.org/repos/asf/airflow.git discard f1fa905b4f openlineage, snowflake: do not run queries for Snowflake new e94fb0809b openlineage, snowflake: do not run queries for Snowflake This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (f1fa905b4f) \ N -- N -- N refs/heads/snowflake-openlineage-dontuseexternalconnection (e94fb0809b) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: airflow/providers/common/sql/operators/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
(airflow) 01/01: openlineage, snowflake: do not run queries for Snowflake
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch snowflake-openlineage-dontuseexternalconnection in repository https://gitbox.apache.org/repos/asf/airflow.git commit e94fb0809b59ed37ab0dc1400b9ab57a9413c38a Author: Maciej Obuchowski AuthorDate: Thu Apr 18 00:21:18 2024 +0200 openlineage, snowflake: do not run queries for Snowflake Signed-off-by: Maciej Obuchowski --- airflow/providers/common/sql/operators/sql.py | 9 +++ airflow/providers/openlineage/sqlparser.py | 61 +++ airflow/providers/openlineage/utils/utils.py | 5 ++ airflow/providers/snowflake/hooks/snowflake.py | 24 ++-- .../providers/snowflake/hooks/snowflake_sql_api.py | 8 +-- tests/providers/snowflake/hooks/test_snowflake.py | 37 +--- .../snowflake/hooks/test_snowflake_sql_api.py | 32 +++--- .../snowflake/operators/test_snowflake_sql.py | 69 -- 8 files changed, 124 insertions(+), 121 deletions(-) diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index 1fd22b86b7..ea791992d5 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -309,6 +309,14 @@ class SQLExecuteQueryOperator(BaseSQLOperator): hook = self.get_db_hook() +try: +from airflow.providers.openlineage.utils.utils import should_use_external_connection + +use_external_connection = should_use_external_connection(hook) +except ImportError: +# OpenLineage provider release < 1.8.0 - we always use connection +use_external_connection = True + connection = hook.get_connection(getattr(hook, hook.conn_name_attr)) try: database_info = hook.get_openlineage_database_info(connection) @@ -334,6 +342,7 @@ class SQLExecuteQueryOperator(BaseSQLOperator): database_info=database_info, database=self.database, sqlalchemy_engine=hook.get_sqlalchemy_engine(), +use_connection=use_external_connection, ) return operator_lineage diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index c27dedc53c..f181ff8cce 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -29,6 +29,7 @@ from openlineage.client.facet import ( ExtractionErrorRunFacet, SqlJobFacet, ) +from openlineage.client.run import Dataset from openlineage.common.sql import DbTableMeta, SqlMeta, parse from airflow.providers.openlineage.extractors.base import OperatorLineage @@ -40,7 +41,6 @@ from airflow.providers.openlineage.utils.sql import ( from airflow.typing_compat import TypedDict if TYPE_CHECKING: -from openlineage.client.run import Dataset from sqlalchemy.engine import Engine from airflow.hooks.base import BaseHook @@ -104,6 +104,18 @@ class DatabaseInfo: normalize_name_method: Callable[[str], str] = default_normalize_name_method +def from_table_meta( +table_meta: DbTableMeta, database: str | None, namespace: str, is_uppercase: bool +) -> Dataset: +if table_meta.database: +name = table_meta.qualified_name +elif database: +name = f"{database}.{table_meta.schema}.{table_meta.name}" +else: +name = f"{table_meta.schema}.{table_meta.name}" +return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) + + class SQLParser: """Interface for openlineage-sql. @@ -117,7 +129,7 @@ class SQLParser: def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" -return parse(sql=sql, dialect=self.dialect) +return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( self, @@ -156,6 +168,23 @@ class SQLParser: else None, ) +def get_metadata_from_parser( +self, +inputs: list[DbTableMeta], +outputs: list[DbTableMeta], +database_info: DatabaseInfo, +namespace: str = DEFAULT_NAMESPACE, +database: str | None = None, +) -> tuple[list[Dataset], ...]: +database = database if database else database_info.database +return [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in inputs +], [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in outputs +] + def attach_column_lineage( self, datasets: list[Dataset], database: str | None, parse_result: SqlMeta ) -> None: @@ -204,6 +233,7 @@ class SQLPa
(airflow) branch snowflake-openlineage-dontuseexternalconnection created (now f1fa905b4f)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch snowflake-openlineage-dontuseexternalconnection in repository https://gitbox.apache.org/repos/asf/airflow.git at f1fa905b4f openlineage, snowflake: do not run queries for Snowflake This branch includes the following new commits: new f1fa905b4f openlineage, snowflake: do not run queries for Snowflake The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) 01/01: openlineage, snowflake: do not run queries for Snowflake
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch snowflake-openlineage-dontuseexternalconnection in repository https://gitbox.apache.org/repos/asf/airflow.git commit f1fa905b4f0177e5564cf1de0802d70f6b323ae9 Author: Maciej Obuchowski AuthorDate: Thu Apr 18 00:21:18 2024 +0200 openlineage, snowflake: do not run queries for Snowflake Signed-off-by: Maciej Obuchowski --- airflow/providers/common/sql/operators/sql.py | 9 +++ airflow/providers/openlineage/sqlparser.py | 61 +++ airflow/providers/openlineage/utils/utils.py | 5 ++ airflow/providers/snowflake/hooks/snowflake.py | 24 ++-- .../providers/snowflake/hooks/snowflake_sql_api.py | 8 +-- tests/providers/snowflake/hooks/test_snowflake.py | 37 +--- .../snowflake/hooks/test_snowflake_sql_api.py | 32 +++--- .../snowflake/operators/test_snowflake_sql.py | 69 -- 8 files changed, 124 insertions(+), 121 deletions(-) diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index 1fd22b86b7..5f6c735e68 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -309,6 +309,14 @@ class SQLExecuteQueryOperator(BaseSQLOperator): hook = self.get_db_hook() +try: +from airflow.providers.openlineage.utils.utils import should_use_external_connection + +use_external_connection = should_use_external_connection(hook) +except ImportError: +# OpenLineage provider release < 1.8.0 +use_external_connection = False + connection = hook.get_connection(getattr(hook, hook.conn_name_attr)) try: database_info = hook.get_openlineage_database_info(connection) @@ -334,6 +342,7 @@ class SQLExecuteQueryOperator(BaseSQLOperator): database_info=database_info, database=self.database, sqlalchemy_engine=hook.get_sqlalchemy_engine(), +use_connection=use_external_connection, ) return operator_lineage diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index c27dedc53c..f181ff8cce 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -29,6 +29,7 @@ from openlineage.client.facet import ( ExtractionErrorRunFacet, SqlJobFacet, ) +from openlineage.client.run import Dataset from openlineage.common.sql import DbTableMeta, SqlMeta, parse from airflow.providers.openlineage.extractors.base import OperatorLineage @@ -40,7 +41,6 @@ from airflow.providers.openlineage.utils.sql import ( from airflow.typing_compat import TypedDict if TYPE_CHECKING: -from openlineage.client.run import Dataset from sqlalchemy.engine import Engine from airflow.hooks.base import BaseHook @@ -104,6 +104,18 @@ class DatabaseInfo: normalize_name_method: Callable[[str], str] = default_normalize_name_method +def from_table_meta( +table_meta: DbTableMeta, database: str | None, namespace: str, is_uppercase: bool +) -> Dataset: +if table_meta.database: +name = table_meta.qualified_name +elif database: +name = f"{database}.{table_meta.schema}.{table_meta.name}" +else: +name = f"{table_meta.schema}.{table_meta.name}" +return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) + + class SQLParser: """Interface for openlineage-sql. @@ -117,7 +129,7 @@ class SQLParser: def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" -return parse(sql=sql, dialect=self.dialect) +return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( self, @@ -156,6 +168,23 @@ class SQLParser: else None, ) +def get_metadata_from_parser( +self, +inputs: list[DbTableMeta], +outputs: list[DbTableMeta], +database_info: DatabaseInfo, +namespace: str = DEFAULT_NAMESPACE, +database: str | None = None, +) -> tuple[list[Dataset], ...]: +database = database if database else database_info.database +return [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in inputs +], [ +from_table_meta(dataset, database, namespace, database_info.is_uppercase_names) +for dataset in outputs +] + def attach_column_lineage( self, datasets: list[Dataset], database: str | None, parse_result: SqlMeta ) -> None: @@ -204,6 +233,7 @@ class SQLParser: database_info: DatabaseInfo,
(airflow) branch main updated: fix: OpenLineage datasets in GCSTimeSpanFileTransformOperator (#39064)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 066708352e fix: OpenLineage datasets in GCSTimeSpanFileTransformOperator (#39064) 066708352e is described below commit 066708352e6a6a06f213b65324e982f582019b8e Author: Kacper Muda AuthorDate: Wed Apr 17 21:14:26 2024 +0200 fix: OpenLineage datasets in GCSTimeSpanFileTransformOperator (#39064) Signed-off-by: Kacper Muda --- airflow/providers/google/cloud/operators/gcs.py| 69 -- tests/providers/google/cloud/operators/test_gcs.py | 105 +++-- 2 files changed, 115 insertions(+), 59 deletions(-) diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index c311c8b4ed..6c72378a43 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -774,8 +774,8 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): self.upload_continue_on_fail = upload_continue_on_fail self.upload_num_attempts = upload_num_attempts -self._source_object_names: list[str] = [] -self._destination_object_names: list[str] = [] +self._source_prefix_interp: str | None = None +self._destination_prefix_interp: str | None = None def execute(self, context: Context) -> list[str]: # Define intervals and prefixes. @@ -803,11 +803,11 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): timespan_start = timespan_start.in_timezone(timezone.utc) timespan_end = timespan_end.in_timezone(timezone.utc) -source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix( +self._source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix( self.source_prefix, timespan_start, ) -destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix( +self._destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix( self.destination_prefix, timespan_start, ) @@ -828,9 +828,9 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): ) # Fetch list of files. -self._source_object_names = source_hook.list_by_timespan( +blobs_to_transform = source_hook.list_by_timespan( bucket_name=self.source_bucket, -prefix=source_prefix_interp, +prefix=self._source_prefix_interp, timespan_start=timespan_start, timespan_end=timespan_end, ) @@ -840,7 +840,7 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): temp_output_dir_path = Path(temp_output_dir) # TODO: download in parallel. -for blob_to_transform in self._source_object_names: +for blob_to_transform in blobs_to_transform: destination_file = temp_input_dir_path / blob_to_transform destination_file.parent.mkdir(parents=True, exist_ok=True) try: @@ -877,6 +877,8 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): self.log.info("Transformation succeeded. Output temporarily located at %s", temp_output_dir_path) +files_uploaded = [] + # TODO: upload in parallel. for upload_file in temp_output_dir_path.glob("**/*"): if upload_file.is_dir(): @@ -884,8 +886,8 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): upload_file_name = str(upload_file.relative_to(temp_output_dir_path)) -if self.destination_prefix is not None: -upload_file_name = f"{destination_prefix_interp}/{upload_file_name}" +if self._destination_prefix_interp is not None: +upload_file_name = f"{self._destination_prefix_interp.rstrip('/')}/{upload_file_name}" self.log.info("Uploading file %s to %s", upload_file, upload_file_name) @@ -897,35 +899,46 @@ class GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator): chunk_size=self.chunk_size, num_max_attempts=self.upload_num_attempts, ) - self._destination_object_names.append(str(upload_file_name)) +files_uploaded.append(str(upload_file_name)) except GoogleCloudError: if not self.upload_continue_on_fail: raise -return self._destination_object_names +return files_uploaded def get_openlineage_facets_on_complete(self, task_in
(airflow) branch main updated: Fix OpenLineage provide plugin examples (#39029)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 09e938a2a7 Fix OpenLineage provide plugin examples (#39029) 09e938a2a7 is described below commit 09e938a2a76428016747162e53b9e39ecd2ccfbe Author: Maxim Martynov AuthorDate: Tue Apr 16 12:04:40 2024 +0300 Fix OpenLineage provide plugin examples (#39029) --- docs/apache-airflow-providers-openlineage/macros.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/apache-airflow-providers-openlineage/macros.rst b/docs/apache-airflow-providers-openlineage/macros.rst index 3ce285f966..c07d20824e 100644 --- a/docs/apache-airflow-providers-openlineage/macros.rst +++ b/docs/apache-airflow-providers-openlineage/macros.rst @@ -42,9 +42,9 @@ For example, ``SparkSubmitOperator`` can be set up like this: application="/script.py", conf={ # separated components -"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineagePlugin.lineage_job_namespace() }}", -"spark.openlineage.parentJobName": "{{ macros.OpenLineagePlugin.lineage_job_name(task_instance) }}", -"spark.openlineage.parentRunId": "{{ macros.OpenLineagePlugin.lineage_run_id(task_instance) }}", +"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}", +"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}", +"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}", }, )
(airflow) branch main updated: Give `on_task_instance_failed` access to the error that caused the failure (#38155)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 53dcbce745 Give `on_task_instance_failed` access to the error that caused the failure (#38155) 53dcbce745 is described below commit 53dcbce745bbb8ae4e48a55a58daa36d8b5173bd Author: Raphaël Vandon AuthorDate: Fri Apr 12 10:54:26 2024 +0200 Give `on_task_instance_failed` access to the error that caused the failure (#38155) * give on_task_instance_failed access to the error that caused the failure * store error in thread-local storage && update sample dag * add sample code to the listeners doc * test that the error is accessible on callback * fix stuff detected by static checks * add _thread_local_data to TaskInstancePydantic as well it's a possible type in handle_failure * mention error recovery mechanism in doc * replace previous solution with a new parameter --- airflow/example_dags/plugins/event_listener.py | 7 +++- airflow/listeners/spec/taskinstance.py | 5 ++- airflow/models/taskinstance.py | 3 +- .../administration-and-deployment/listeners.rst| 42 -- tests/listeners/class_listener.py | 4 ++- tests/listeners/file_write_listener.py | 4 ++- tests/listeners/full_listener.py | 2 +- tests/models/test_taskinstance.py | 16 +++-- 8 files changed, 72 insertions(+), 11 deletions(-) diff --git a/airflow/example_dags/plugins/event_listener.py b/airflow/example_dags/plugins/event_listener.py index 43d9df9727..4b9be307c4 100644 --- a/airflow/example_dags/plugins/event_listener.py +++ b/airflow/example_dags/plugins/event_listener.py @@ -89,7 +89,9 @@ def on_task_instance_success(previous_state: TaskInstanceState, task_instance: T # [START howto_listen_ti_failure_task] @hookimpl -def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: TaskInstance, session): +def on_task_instance_failed( +previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session +): """ This method is called when task state changes to FAILED. Through callback, parameters like previous_task_state, task_instance object can be accessed. @@ -113,6 +115,8 @@ def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: Ta print(f"Task start:{start_date} end:{end_date} duration:{duration}") print(f"Task:{task} dag:{dag} dagrun:{dagrun}") +if error: +print(f"Failure caused by {error}") # [END howto_listen_ti_failure_task] @@ -146,6 +150,7 @@ def on_dag_run_failed(dag_run: DagRun, msg: str): external_trigger = dag_run.external_trigger print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}") +print(f"Failed with message: {msg}") # [END howto_listen_dagrun_failure_task] diff --git a/airflow/listeners/spec/taskinstance.py b/airflow/listeners/spec/taskinstance.py index 03f0a00478..f012de0aac 100644 --- a/airflow/listeners/spec/taskinstance.py +++ b/airflow/listeners/spec/taskinstance.py @@ -46,6 +46,9 @@ def on_task_instance_success( @hookspec def on_task_instance_failed( -previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None +previous_state: TaskInstanceState | None, +task_instance: TaskInstance, +error: None | str | BaseException, +session: Session | None, ): """Execute when task state changes to FAIL. previous_state can be None.""" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index c3bf25e343..b07aed936d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1413,6 +1413,7 @@ class TaskInstance(Base, LoggingMixin): cascade="all, delete, delete-orphan", ) note = association_proxy("task_instance_note", "content", creator=_creator_note) + task: Operator | None = None test_mode: bool = False is_trigger_log_context: bool = False @@ -2934,7 +2935,7 @@ class TaskInstance(Base, LoggingMixin): ): """Handle Failure for the TaskInstance.""" get_listener_manager().hook.on_task_instance_failed( -previous_state=TaskInstanceState.RUNNING, task_instance=ti, session=session +previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session ) if error: diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst b/docs/apache-airflow/administration-and-deployme
(airflow) branch main updated: Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 093ab7e755 Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829) 093ab7e755 is described below commit 093ab7e7556bad9202e83e9fd6d968c50a5f7cb8 Author: Maxim Martynov AuthorDate: Mon Apr 8 20:17:14 2024 +0300 Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829) --- airflow/providers/openlineage/plugins/macros.py| 43 .../providers/openlineage/plugins/openlineage.py | 9 +++- airflow/providers/openlineage/utils/utils.py | 2 +- .../macros.rst | 59 ++ tests/providers/openlineage/plugins/test_macros.py | 36 ++--- 5 files changed, 109 insertions(+), 40 deletions(-) diff --git a/airflow/providers/openlineage/plugins/macros.py b/airflow/providers/openlineage/plugins/macros.py index 391b29495f..ddfceb3459 100644 --- a/airflow/providers/openlineage/plugins/macros.py +++ b/airflow/providers/openlineage/plugins/macros.py @@ -26,22 +26,41 @@ if TYPE_CHECKING: from airflow.models import TaskInstance +def lineage_job_namespace(): +""" +Macro function which returns Airflow OpenLineage namespace. + +.. seealso:: +For more information take a look at the guide: +:ref:`howto/macros:openlineage` +""" +return conf.namespace() + + +def lineage_job_name(task_instance: TaskInstance): +""" +Macro function which returns Airflow task name in OpenLineage format (`.`). + +.. seealso:: +For more information take a look at the guide: +:ref:`howto/macros:openlineage` +""" +return get_job_name(task_instance) + + def lineage_run_id(task_instance: TaskInstance): """ -Macro function which returns the generated run id for a given task. +Macro function which returns the generated run id (UUID) for a given task. This can be used to forward the run id from a task to a child run so the job hierarchy is preserved. .. seealso:: -For more information on how to use this operator, take a look at the guide: +For more information take a look at the guide: :ref:`howto/macros:openlineage` """ -if TYPE_CHECKING: -assert task_instance.task - return OpenLineageAdapter.build_task_instance_run_id( dag_id=task_instance.dag_id, -task_id=task_instance.task.task_id, +task_id=task_instance.task_id, execution_date=task_instance.execution_date, try_number=task_instance.try_number, ) @@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance): run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information. .. seealso:: -For more information on how to use this macro, take a look at the guide: +For more information take a look at the guide: :ref:`howto/macros:openlineage` """ -job_name = get_job_name(task_instance.task) -run_id = lineage_run_id(task_instance) -return f"{conf.namespace()}/{job_name}/{run_id}" +return "/".join( +( +lineage_job_namespace(), +lineage_job_name(task_instance), +lineage_run_id(task_instance), +) +) diff --git a/airflow/providers/openlineage/plugins/openlineage.py b/airflow/providers/openlineage/plugins/openlineage.py index a0be47a499..5927929588 100644 --- a/airflow/providers/openlineage/plugins/openlineage.py +++ b/airflow/providers/openlineage/plugins/openlineage.py @@ -19,7 +19,12 @@ from __future__ import annotations from airflow.plugins_manager import AirflowPlugin from airflow.providers.openlineage import conf from airflow.providers.openlineage.plugins.listener import get_openlineage_listener -from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id +from airflow.providers.openlineage.plugins.macros import ( +lineage_job_name, +lineage_job_namespace, +lineage_parent_id, +lineage_run_id, +) class OpenLineageProviderPlugin(AirflowPlugin): @@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin): name = "OpenLineageProviderPlugin" if not conf.is_disabled(): -macros = [lineage_run_id, lineage_parent_id] +macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id] listeners = [get_openlineage_listener()] diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index fb2263b90d..1c777aff76 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/provide
(airflow) branch main updated: fix: try002 for provider openlineage (#38806)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 7d9737154c fix: try002 for provider openlineage (#38806) 7d9737154c is described below commit 7d9737154c53fe96fd925a95d670eec36a24c6e3 Author: Sebastian Daum AuthorDate: Sun Apr 7 19:46:34 2024 +0200 fix: try002 for provider openlineage (#38806) --- airflow/providers/openlineage/utils/utils.py | 2 +- pyproject.toml | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index a56426d4af..fb2263b90d 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -144,7 +144,7 @@ class InfoJsonEncodable(dict): def _include_fields(self): if self.includes and self.excludes: -raise Exception("Don't use both includes and excludes.") +raise ValueError("Don't use both includes and excludes.") if self.includes: for field in self.includes: if field not in self._fields and hasattr(self.obj, field): diff --git a/pyproject.toml b/pyproject.toml index a242673d46..5790943f53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -403,8 +403,6 @@ combine-as-imports = true "airflow/providers/imap/hooks/imap.py" = ["TRY002"] # microsoft.azure "airflow/providers/microsoft/azure/operators/synapse.py" = ["TRY002"] -# openlineage -"airflow/providers/openlineage/utils/utils.py" = ["TRY002"] [tool.ruff.lint.flake8-tidy-imports] # Disallow all relative imports.
(airflow) branch openlineage-do-not-submit-busyjobs created (now 3c173d596b)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch openlineage-do-not-submit-busyjobs in repository https://gitbox.apache.org/repos/asf/airflow.git at 3c173d596b openlineage: skip sending events if the executor is busy This branch includes the following new commits: new 3c173d596b openlineage: skip sending events if the executor is busy The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) 01/01: openlineage: skip sending events if the executor is busy
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-do-not-submit-busyjobs in repository https://gitbox.apache.org/repos/asf/airflow.git commit 3c173d596bfa17b8432e5c46a9624cbf8f1471e8 Author: Maciej Obuchowski AuthorDate: Thu Apr 4 21:01:23 2024 +0200 openlineage: skip sending events if the executor is busy Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/plugins/listener.py | 24 +++ 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 25ded6d7f4..a839b83cd7 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,9 +17,10 @@ from __future__ import annotations import logging +import threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable from openlineage.client.serde import Serde @@ -250,7 +251,9 @@ class OpenLineageListener: @property def executor(self): if not self._executor: -self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") +max_workers = 8 +self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="openlineage_") +self._busy_semaphore = threading.Semaphore(max_workers) return self._executor @hookimpl @@ -277,6 +280,19 @@ class OpenLineageListener: nominal_end_time=data_interval_end, ) +def executor_submit(self, callable: Callable, *args, **kwargs): +def execute(): +try: +callable(*args, **kwargs) +finally: +self._busy_semaphore.release() + +acquired = self._busy_semaphore.acquire(blocking=False) +if not acquired: +self.log.debug("Executor is busy; skipping sending OpenLineage event") +return +self.executor.submit(execute) + @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str): if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): @@ -284,7 +300,7 @@ class OpenLineageListener: if not self.executor: self.log.debug("Executor have not started before `on_dag_run_success`") return -self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg) +self.executor_submit(self.adapter.dag_success, dag_run=dag_run, msg=msg) @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str): @@ -293,7 +309,7 @@ class OpenLineageListener: if not self.executor: self.log.debug("Executor have not started before `on_dag_run_failed`") return -self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg) +self.executor_submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg) def get_openlineage_listener() -> OpenLineageListener:
(airflow) branch main updated (5337066492 -> ecd69955f9)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 5337066492 Fix grid header rendering (#38720) add ecd69955f9 fix: Remove redundant operator information from facets (#38264) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/extractors/bash.py | 22 ++ .../providers/openlineage/extractors/manager.py| 17 + airflow/providers/openlineage/extractors/python.py | 22 ++ airflow/providers/openlineage/plugins/facets.py| 11 +++ airflow/providers/openlineage/utils/utils.py | 78 ++ .../providers/openlineage/extractors/test_bash.py | 35 +++--- .../openlineage/extractors/test_python.py | 33 ++--- 7 files changed, 120 insertions(+), 98 deletions(-)
(airflow) branch main updated: openlineage: add `opt-in` option (#37725)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 714a933479 openlineage: add `opt-in` option (#37725) 714a933479 is described below commit 714a933479f9dc1c3ef5916e43292efc182a0857 Author: Jakub Dardzinski AuthorDate: Tue Mar 26 13:01:44 2024 +0100 openlineage: add `opt-in` option (#37725) * Add `opt-in` option to disable OpenLineage for all DAGs/tasks by default and enable it selectively. Signed-off-by: Jakub Dardzinski * Rename `opt_in` to `selective_enable`. Signed-off-by: Jakub Dardzinski - Signed-off-by: Jakub Dardzinski --- airflow/providers/openlineage/conf.py | 5 + airflow/providers/openlineage/plugins/listener.py | 16 +++ airflow/providers/openlineage/provider.yaml| 8 ++ .../openlineage/utils/selective_enable.py | 87 airflow/providers/openlineage/utils/utils.py | 19 ++- .../guides/user.rst| 56 .../providers/openlineage/plugins/test_listener.py | 158 - .../openlineage/utils/test_selective_enable.py | 72 ++ 8 files changed, 415 insertions(+), 6 deletions(-) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index ba8ce913c7..4ca42eedfd 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -51,6 +51,11 @@ def disabled_operators() -> set[str]: return set(operator.strip() for operator in option.split(";") if operator.strip()) +@cache +def selective_enable() -> bool: +return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False) + + @cache def custom_extractors() -> set[str]: """[openlineage] extractors.""" diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 0d6b487f22..ba1e5a7906 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -31,6 +31,7 @@ from airflow.providers.openlineage.utils.utils import ( get_custom_facets, get_job_name, is_operator_disabled, +is_selective_lineage_enabled, print_warning, ) from airflow.stats import Stats @@ -83,6 +84,9 @@ class OpenLineageListener: ) return None +if not is_selective_lineage_enabled(task): +return + @print_warning(self.log) def on_running(): # that's a workaround to detect task running from deferred state @@ -150,6 +154,9 @@ class OpenLineageListener: ) return None +if not is_selective_lineage_enabled(task): +return + @print_warning(self.log) def on_success(): parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id) @@ -202,6 +209,9 @@ class OpenLineageListener: ) return None +if not is_selective_lineage_enabled(task): +return + @print_warning(self.log) def on_failure(): parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id) @@ -255,6 +265,8 @@ class OpenLineageListener: @hookimpl def on_dag_run_running(self, dag_run: DagRun, msg: str): +if not is_selective_lineage_enabled(dag_run.dag): +return data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None self.executor.submit( @@ -267,6 +279,8 @@ class OpenLineageListener: @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str): +if not is_selective_lineage_enabled(dag_run.dag): +return if not self.executor: self.log.debug("Executor have not started before `on_dag_run_success`") return @@ -274,6 +288,8 @@ class OpenLineageListener: @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str): +if not is_selective_lineage_enabled(dag_run.dag): +return if not self.executor: self.log.debug("Executor have not started before `on_dag_run_failed`") return diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index aac9b43111..075f711779 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -77,6 +77,14 @@ config: example: "airflow.operators.bash.BashOperator;airflow.operators.python.Python
(airflow) branch main updated: fix: disabled_for_operators now stops whole event emission (#38033)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 9c4e333f5b fix: disabled_for_operators now stops whole event emission (#38033) 9c4e333f5b is described below commit 9c4e333f5b7cc6f950f6791500ecd4bad41ba2f9 Author: Kacper Muda AuthorDate: Mon Mar 25 11:50:53 2024 +0100 fix: disabled_for_operators now stops whole event emission (#38033) Signed-off-by: Kacper Muda --- airflow/providers/dbt/cloud/utils/openlineage.py | 6 +- airflow/providers/openlineage/conf.py | 98 ++ airflow/providers/openlineage/extractors/base.py | 30 -- airflow/providers/openlineage/extractors/bash.py | 8 +- .../providers/openlineage/extractors/manager.py| 30 +- airflow/providers/openlineage/extractors/python.py | 8 +- airflow/providers/openlineage/plugins/adapter.py | 28 +- airflow/providers/openlineage/plugins/listener.py | 23 +- airflow/providers/openlineage/plugins/macros.py| 5 +- .../providers/openlineage/plugins/openlineage.py | 19 +- airflow/providers/openlineage/utils/utils.py | 22 +- .../providers/openlineage/extractors/test_base.py | 33 +- .../providers/openlineage/extractors/test_bash.py | 103 +- .../openlineage/extractors/test_python.py | 117 +-- .../providers/openlineage/plugins/test_listener.py | 69 +++- tests/providers/openlineage/plugins/test_macros.py | 4 +- .../openlineage/plugins/test_openlineage.py| 65 +++- .../plugins/test_openlineage_adapter.py| 181 +- tests/providers/openlineage/plugins/test_utils.py | 30 ++ tests/providers/openlineage/test_conf.py | 389 + 20 files changed, 913 insertions(+), 355 deletions(-) diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py b/airflow/providers/dbt/cloud/utils/openlineage.py index f86c77a689..358382c39b 100644 --- a/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/airflow/providers/dbt/cloud/utils/openlineage.py @@ -47,9 +47,9 @@ def generate_openlineage_events_from_dbt_cloud_run( """ from openlineage.common.provider.dbt import DbtCloudArtifactProcessor, ParentRunMetadata +from airflow.providers.openlineage.conf import namespace from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.plugins.adapter import ( -_DAG_NAMESPACE, _PRODUCER, OpenLineageAdapter, ) @@ -110,7 +110,7 @@ def generate_openlineage_events_from_dbt_cloud_run( processor = DbtCloudArtifactProcessor( producer=_PRODUCER, -job_namespace=_DAG_NAMESPACE, +job_namespace=namespace(), skip_errors=False, logger=operator.log, manifest=manifest, @@ -130,7 +130,7 @@ def generate_openlineage_events_from_dbt_cloud_run( parent_job = ParentRunMetadata( run_id=parent_run_id, job_name=f"{task_instance.dag_id}.{task_instance.task_id}", -job_namespace=_DAG_NAMESPACE, +job_namespace=namespace(), ) processor.dbt_run_metadata = parent_job diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py new file mode 100644 index 00..ba8ce913c7 --- /dev/null +++ b/airflow/providers/openlineage/conf.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import os +from typing import Any + +from airflow.compat.functools import cache +from airflow.configuration import conf + +_CONFIG_SECTION = "openlineage" + + +@cache +def config_path(check_legacy_env_var: bool = True) -> str: +"""[openlineage] config_path.""" +option = conf.get(_CONFIG_SECTION, "config_path", fallback="") +if check_legacy_env_var and not option: +option = os.getenv("OPENLINEAGE_CONFIG", "") +return option + + +@cache +def is_sour
(airflow) branch main updated (832d2f561a -> ea5238a81b)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 832d2f561a Add the dataset_expression as part of DagModel and DAGDetailSchema (#37826) add ea5238a81b fix: Add fallbacks when retrieving Airflow configuration to avoid errors being raised (#37994) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/extractors/base.py | 3 ++- airflow/providers/openlineage/plugins/adapter.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-)
(airflow) branch main updated: fix: Fix parent id macro and remove unused utils (#37877)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 2852976ea6 fix: Fix parent id macro and remove unused utils (#37877) 2852976ea6 is described below commit 2852976ea6321b152ebc631d30d5526703bc6590 Author: Kacper Muda AuthorDate: Tue Mar 5 14:11:47 2024 +0100 fix: Fix parent id macro and remove unused utils (#37877) --- airflow/providers/openlineage/plugins/macros.py| 26 +++ airflow/providers/openlineage/utils/utils.py | 85 +- tests/providers/openlineage/plugins/test_macros.py | 19 +++-- tests/providers/openlineage/plugins/test_utils.py | 36 - 4 files changed, 20 insertions(+), 146 deletions(-) diff --git a/airflow/providers/openlineage/plugins/macros.py b/airflow/providers/openlineage/plugins/macros.py index a4039db2f4..fa05a60386 100644 --- a/airflow/providers/openlineage/plugins/macros.py +++ b/airflow/providers/openlineage/plugins/macros.py @@ -16,17 +16,14 @@ # under the License. from __future__ import annotations -import os import typing -from airflow.configuration import conf -from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter +from airflow.providers.openlineage.plugins.adapter import _DAG_NAMESPACE, OpenLineageAdapter +from airflow.providers.openlineage.utils.utils import get_job_name if typing.TYPE_CHECKING: from airflow.models import TaskInstance -_JOB_NAMESPACE = conf.get("openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", "default")) - def lineage_run_id(task_instance: TaskInstance): """ @@ -46,21 +43,18 @@ def lineage_run_id(task_instance: TaskInstance): ) -def lineage_parent_id(run_id: str, task_instance: TaskInstance): +def lineage_parent_id(task_instance: TaskInstance): """ -Macro function which returns the generated job and run id for a given task. +Macro function which returns a unique identifier of given task that can be used to create ParentRunFacet. -This can be used to forward the ids from a task to a child run so the job -hierarchy is preserved. Child run can create ParentRunFacet from those ids. +This identifier is composed of the namespace, job name, and generated run id for given task, structured +as '{namespace}/{job_name}/{run_id}'. This can be used to forward task information from a task to a child +run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information. .. seealso:: For more information on how to use this macro, take a look at the guide: :ref:`howto/macros:openlineage` """ -job_name = OpenLineageAdapter.build_task_instance_run_id( -dag_id=task_instance.dag_id, -task_id=task_instance.task.task_id, -execution_date=task_instance.execution_date, -try_number=task_instance.try_number, -) -return f"{_JOB_NAMESPACE}/{job_name}/{run_id}" +job_name = get_job_name(task_instance.task) +run_id = lineage_run_id(task_instance) +return f"{_DAG_NAMESPACE}/{job_name}/{run_id}" diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 1f6c723883..4f8cfbff71 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -24,7 +24,6 @@ import os from contextlib import suppress from functools import wraps from typing import TYPE_CHECKING, Any, Iterable -from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse import attrs from attrs import asdict @@ -42,101 +41,19 @@ from airflow.utils.context import AirflowContextDeprecationWarning from airflow.utils.log.secrets_masker import Redactable, Redacted, SecretsMasker, should_hide_value_for_key if TYPE_CHECKING: -from airflow.models import DAG, BaseOperator, Connection, DagRun, TaskInstance +from airflow.models import DAG, BaseOperator, DagRun, TaskInstance log = logging.getLogger(__name__) _NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" -def openlineage_job_name(dag_id: str, task_id: str) -> str: -return f"{dag_id}.{task_id}" - - def get_operator_class(task: BaseOperator) -> type: if task.__class__.__name__ in ("DecoratedMappedOperator", "MappedOperator"): return task.operator_class return task.__class__ -def to_json_encodable(task: BaseOperator) -> dict[str, object]: -def _task_encoder(obj): -from airflow.models import DAG - -if isinstance(obj, datetime.datetime): -return obj.isoformat() -elif isinstance(obj, DAG): -return { -"dag_id"
(airflow) branch main updated (e904f679c0 -> 9848954e78)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from e904f679c0 Format improvements, URL fixes in contrib docs (#37820) add 9848954e78 feat: Add OpenLineage metrics for event_size and extraction time (#37797) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/plugins/adapter.py | 29 ++ airflow/providers/openlineage/plugins/listener.py | 46 +-- 2 files changed, 56 insertions(+), 19 deletions(-)
(airflow) branch main updated: tests: Add OpenLineage test cases for File to Dataset conversion (#37791)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new e358bb2253 tests: Add OpenLineage test cases for File to Dataset conversion (#37791) e358bb2253 is described below commit e358bb2253509dcb3631db7ddffad7dc557ca97e Author: Kacper Muda AuthorDate: Thu Feb 29 13:15:35 2024 +0100 tests: Add OpenLineage test cases for File to Dataset conversion (#37791) --- airflow/providers/openlineage/extractors/manager.py | 20 ++-- .../providers/openlineage/extractors/test_manager.py | 13 + 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index cb77c796ba..405db3d8e5 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -186,16 +186,24 @@ class ExtractorManager(LoggingMixin): from openlineage.client.run import Dataset +if "/" not in uri: +return None + try: scheme, netloc, path, params, _, _ = urlparse(uri) except Exception: return None -if scheme.startswith("s3"): -return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/")) -elif scheme.startswith(("gcs", "gs")): -return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/")) -elif "/" not in uri: -return None + +common_schemas = { +"s3": "s3", +"gs": "gs", +"gcs": "gs", +"hdfs": "hdfs", +"file": "file", +} +for found, final in common_schemas.items(): +if scheme.startswith(found): +return Dataset(namespace=f"{final}://{netloc}", name=path.lstrip("/")) return Dataset(namespace=scheme, name=f"{netloc}{path}") @staticmethod diff --git a/tests/providers/openlineage/extractors/test_manager.py b/tests/providers/openlineage/extractors/test_manager.py index d1f794b49d..7a790e8393 100644 --- a/tests/providers/openlineage/extractors/test_manager.py +++ b/tests/providers/openlineage/extractors/test_manager.py @@ -36,8 +36,13 @@ from airflow.providers.openlineage.extractors.manager import ExtractorManager ("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", name="dir1/file1")), ("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", name="dir2/file2")), ("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", name="dir3/file3")), +("hdfs://namenodehost:8020/file1", Dataset(namespace="hdfs://namenodehost:8020", name="file1")), +("hdfs://namenodehost/file2", Dataset(namespace="hdfs://namenodehost", name="file2")), +("file://localhost/etc/fstab", Dataset(namespace="file://localhost", name="etc/fstab")), +("file:///etc/fstab", Dataset(namespace="file://", name="etc/fstab")), ("https://test.com;, Dataset(namespace="https", name="test.com")), ("https://test.com?param1=test1=test2;, Dataset(namespace="https", name="test.com")), +("file:test.csv", None), ("not_an_url", None), ), ) @@ -55,6 +60,14 @@ def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset): ), (File(url="s3://bucket1/dir1/file1"), Dataset(namespace="s3://bucket1", name="dir1/file1")), (File(url="gs://bucket2/dir2/file2"), Dataset(namespace="gs://bucket2", name="dir2/file2")), +(File(url="gcs://bucket3/dir3/file3"), Dataset(namespace="gs://bucket3", name="dir3/file3")), +( +File(url="hdfs://namenodehost:8020/file1"), +Dataset(namespace="hdfs://namenodehost:8020", name="file1"), +), +(File(url="hdfs://namenodehost/file2"), Dataset(namespace="hdfs://namenodehost", name="file2")), +(File(url="file://localhost/etc/fstab"), Dataset(namespace="file://localhost", name="etc/fstab")), +(File(url="file:///etc/fstab"), Dataset(namespace="file://", name="etc/fstab")), (File(url="https://test.com;), Dataset(namespace="https", name="test.com")), (Table(cluster="c1", database="d1", name="t1"), Dataset(namespace="c1", name="d1.t1")), ("gs://bucket2/dir2/file2", None),
(airflow) branch main updated: feat: Add OpenLineage support for File and User Airflow's lineage entities (#37744)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 4938ac04b6 feat: Add OpenLineage support for File and User Airflow's lineage entities (#37744) 4938ac04b6 is described below commit 4938ac04b606ab00d70c3b887e08f76a2b3ea857 Author: Kacper Muda AuthorDate: Thu Feb 29 11:40:12 2024 +0100 feat: Add OpenLineage support for File and User Airflow's lineage entities (#37744) --- .../providers/openlineage/extractors/manager.py| 76 +- .../guides/developer.rst | 62 tests/always/test_project_structure.py | 1 - .../openlineage/extractors/test_manager.py | 159 + 4 files changed, 265 insertions(+), 33 deletions(-) diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index a5654d8bbf..cb77c796ba 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -34,6 +34,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string if TYPE_CHECKING: +from openlineage.client.run import Dataset + +from airflow.lineage.entities import Table from airflow.models import Operator @@ -178,19 +181,78 @@ class ExtractorManager(LoggingMixin): task_metadata.outputs.append(d) @staticmethod -def convert_to_ol_dataset(obj): +def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None: +from urllib.parse import urlparse + +from openlineage.client.run import Dataset + +try: +scheme, netloc, path, params, _, _ = urlparse(uri) +except Exception: +return None +if scheme.startswith("s3"): +return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/")) +elif scheme.startswith(("gcs", "gs")): +return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/")) +elif "/" not in uri: +return None +return Dataset(namespace=scheme, name=f"{netloc}{path}") + +@staticmethod +def convert_to_ol_dataset_from_table(table: Table) -> Dataset: +from openlineage.client.facet import ( +BaseFacet, +OwnershipDatasetFacet, +OwnershipDatasetFacetOwners, +SchemaDatasetFacet, +SchemaField, +) +from openlineage.client.run import Dataset + +facets: dict[str, BaseFacet] = {} +if table.columns: +facets["schema"] = SchemaDatasetFacet( +fields=[ +SchemaField( +name=column.name, +type=column.data_type, +description=column.description, +) +for column in table.columns +] +) +if table.owners: +facets["ownership"] = OwnershipDatasetFacet( +owners=[ +OwnershipDatasetFacetOwners( +# f.e. "user:John Doe " or just "user:" +name=f"user:" +f"{user.first_name + ' ' if user.first_name else ''}" +f"{user.last_name + ' ' if user.last_name else ''}" +f"<{user.email}>", +type="", +) +for user in table.owners +] +) +return Dataset( +namespace=f"{table.cluster}", +name=f"{table.database}.{table.name}", +facets=facets, +) + +@staticmethod +def convert_to_ol_dataset(obj) -> Dataset | None: from openlineage.client.run import Dataset -from airflow.lineage.entities import Table +from airflow.lineage.entities import File, Table if isinstance(obj, Dataset): return obj elif isinstance(obj, Table): -return Dataset( -namespace=f"{obj.cluster}", -name=f"{obj.database}.{obj.name}", -facets={}, -) +return ExtractorManager.convert_to_ol_dataset_from_table(obj) +elif isinstance(obj, File): +return ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(obj.url) else: return None diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst b/docs/apache-airflow-providers-openlin
(airflow) branch main updated: docs: Update whole OpenLineage Provider docs. (#37620)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new fb6511212e docs: Update whole OpenLineage Provider docs. (#37620) fb6511212e is described below commit fb6511212e6b5d552b69fdd05cb8c9501cc1ab18 Author: Kacper Muda AuthorDate: Wed Feb 28 15:13:40 2024 +0100 docs: Update whole OpenLineage Provider docs. (#37620) --- airflow/providers/openlineage/provider.yaml| 37 +- .../configurations-ref.rst | 3 + .../guides/developer.rst | 412 +++-- .../guides/structure.rst | 51 ++- .../guides/user.rst| 244 ++-- .../apache-airflow-providers-openlineage/index.rst | 28 +- .../macros.rst | 6 +- .../supported_classes.rst | 15 +- 8 files changed, 693 insertions(+), 103 deletions(-) diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index 5ba4a0ee32..6871e05e8d 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -58,65 +58,68 @@ config: openlineage: description: | This section applies settings for OpenLineage integration. - For backwards compatibility with `openlineage-python` one can still use - `openlineage.yml` file or `OPENLINEAGE_` environment variables. However, below - configuration takes precedence over those. - More in documentation - https://openlineage.io/docs/client/python#configuration. + More about configuration and it's precedence can be found at + https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#transport-setup options: disabled: description: | - Set this to true if you don't want OpenLineage to emit events. + Disable sending events without uninstalling the OpenLineage Provider by setting this to true. type: boolean example: ~ default: "False" version_added: ~ disabled_for_operators: description: | - Semicolon separated string of Airflow Operator names to disable + Exclude some Operators from emitting OpenLineage events by passing a string of semicolon separated + full import paths of Operators to disable. type: string example: "airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator" default: "" version_added: 1.1.0 namespace: description: | - OpenLineage namespace + Set namespace that the lineage data belongs to, so that if you use multiple OpenLineage producers, + events coming from them will be logically separated. version_added: ~ type: string -example: "food_delivery" +example: "my_airflow_instance_1" default: ~ extractors: description: | - Semicolon separated paths to custom OpenLineage extractors. + Register custom OpenLineage Extractors by passing a string of semicolon separated full import paths. type: string example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass default: ~ version_added: ~ config_path: description: | - Path to YAML config. This provides backwards compatibility to pass config as - `openlineage.yml` file. + Specify the path to the YAML configuration file. + This ensures backwards compatibility with passing config through the `openlineage.yml` file. version_added: ~ type: string -example: ~ +example: "full/path/to/openlineage.yml" default: "" transport: description: | - OpenLineage Client transport configuration. It should contain type - and additional options per each type. + Pass OpenLineage Client transport configuration as JSON string. It should contain type of the + transport and additional options (different for each transport type). For more details see: + https://openlineage.io/docs/client/python/#built-in-transport-types Currently supported types are: * HTTP * Kafka * Console +* File type: string -example: '{"type": "http", "url": "http://localhost:5000"}' +example: '{"type": "http", "url": "http://localhost:5000;, "endpoint": "api/v1/lineage"}' default: "" version_added: ~
(airflow) branch main updated (0c2d2c6544 -> 5289140a03)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 0c2d2c6544 Implement `batch_is_authorized_*` APIs in AWS auth manager (#37430) add 5289140a03 chore: Update comments and logging in OpenLineage ExtractorManager (#37622) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/extractors/manager.py | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-)
(airflow) branch main updated (f2ea8a3e17 -> 1851a71278)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from f2ea8a3e17 Revoking audit_log permission from all users except admin (#37501) add 1851a71278 [OpenLineage] Add support for JobTypeJobFacet properties. (#37255) No new revisions were added by this update. Summary of changes: airflow/providers/openlineage/plugins/adapter.py | 21 -- .../plugins/test_openlineage_adapter.py| 83 +++--- 2 files changed, 91 insertions(+), 13 deletions(-)
(airflow) branch main updated: docs: Add doc page with providers deprecations (#37075)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new b341b5927e docs: Add doc page with providers deprecations (#37075) b341b5927e is described below commit b341b5927e6a7271a85558d3da766b07165ee22a Author: Kacper Muda AuthorDate: Wed Feb 14 15:19:54 2024 +0100 docs: Add doc page with providers deprecations (#37075) --- airflow/providers/amazon/aws/hooks/base_aws.py | 4 +- .../providers/amazon/aws/hooks/redshift_cluster.py | 2 +- docs/README.rst| 13 +++ .../core-extensions/deprecations.rst | 31 ++ docs/exts/docs_build/errors.py | 5 +- docs/exts/docs_build/spelling_checks.py| 13 ++- docs/exts/operators_and_hooks_ref.py | 115 + docs/exts/templates/deprecations.rst.jinja2| 37 +++ 8 files changed, 212 insertions(+), 8 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py index 5f1e075cc3..ff2f33dbe8 100644 --- a/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/airflow/providers/amazon/aws/hooks/base_aws.py @@ -1022,7 +1022,7 @@ except ImportError: @deprecated( reason=( -"airflow.providers.amazon.aws.hook.base_aws.BaseAsyncSessionFactory " +"`airflow.providers.amazon.aws.hook.base_aws.BaseAsyncSessionFactory` " "has been deprecated and will be removed in future" ), category=AirflowProviderDeprecationWarning, @@ -1116,7 +1116,7 @@ class BaseAsyncSessionFactory(BaseSessionFactory): @deprecated( reason=( -"airflow.providers.amazon.aws.hook.base_aws.AwsBaseAsyncHook " +"`airflow.providers.amazon.aws.hook.base_aws.AwsBaseAsyncHook` " "has been deprecated and will be removed in future" ), category=AirflowProviderDeprecationWarning, diff --git a/airflow/providers/amazon/aws/hooks/redshift_cluster.py b/airflow/providers/amazon/aws/hooks/redshift_cluster.py index 286299661c..6168278a59 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_cluster.py +++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py @@ -197,7 +197,7 @@ class RedshiftHook(AwsBaseHook): @deprecated( reason=( -"airflow.providers.amazon.aws.hook.base_aws.RedshiftAsyncHook " +"`airflow.providers.amazon.aws.hook.base_aws.RedshiftAsyncHook` " "has been deprecated and will be removed in future" ), category=AirflowProviderDeprecationWarning, diff --git a/docs/README.rst b/docs/README.rst index d5cddec486..9cb436c0f2 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -258,6 +258,19 @@ warning to report the wrong line in the file for your missing white space. If yo and the line number it reports is wrong, this is a known error. You can find the missing blank space by searching for the syntax you used to make your list, code block, or other whitespace-sensitive markup element. + +spelling error with class or method name + +When a spelling error occurs that has a class/function/method name as incorrectly spelled, +instead of whitelisting it in docs/spelling_wordlist.txt you should make sure that +this name is quoted with backticks "`" - this should exclude it from spellchecking process. + +In this example error, You should change the line with error so that the whole path is inside backticks "`". +.. code-block:: text + +Incorrect Spelling: 'BaseAsyncSessionFactory' +Line with Error: ' airflow.providers.amazon.aws.hook.base_aws.BaseAsyncSessionFactory has been deprecated' + Support === diff --git a/docs/apache-airflow-providers/core-extensions/deprecations.rst b/docs/apache-airflow-providers/core-extensions/deprecations.rst new file mode 100644 index 00..177ea77ab9 --- /dev/null +++ b/docs/apache-airflow-providers/core-extensions/deprecations.rst @@ -0,0 +1,31 @@ + .. Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implie
(airflow) branch main updated: fix: Check if operator is disabled in DefaultExtractor.extract_on_complete (#37392)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 61f0adf0c7 fix: Check if operator is disabled in DefaultExtractor.extract_on_complete (#37392) 61f0adf0c7 is described below commit 61f0adf0c7c2f93ade27686571992e3ef2a65e35 Author: Kacper Muda AuthorDate: Wed Feb 14 11:55:33 2024 +0100 fix: Check if operator is disabled in DefaultExtractor.extract_on_complete (#37392) --- airflow/providers/openlineage/extractors/base.py | 18 ++ tests/providers/openlineage/extractors/test_base.py | 14 ++ tests/providers/openlineage/extractors/test_bash.py | 13 + tests/providers/openlineage/extractors/test_python.py | 2 ++ 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/airflow/providers/openlineage/extractors/base.py b/airflow/providers/openlineage/extractors/base.py index 3e18bb2399..d87334f486 100644 --- a/airflow/providers/openlineage/extractors/base.py +++ b/airflow/providers/openlineage/extractors/base.py @@ -70,6 +70,13 @@ class BaseExtractor(ABC, LoggingMixin): operator.strip() for operator in conf.get("openlineage", "disabled_for_operators").split(";") ) +@cached_property +def _is_operator_disabled(self) -> bool: +fully_qualified_class_name = ( +self.operator.__class__.__module__ + "." + self.operator.__class__.__name__ +) +return fully_qualified_class_name in self.disabled_operators + def validate(self): assert self.operator.task_type in self.get_operator_classnames() @@ -78,10 +85,7 @@ class BaseExtractor(ABC, LoggingMixin): ... def extract(self) -> OperatorLineage | None: -fully_qualified_class_name = ( -self.operator.__class__.__module__ + "." + self.operator.__class__.__name__ -) -if fully_qualified_class_name in self.disabled_operators: +if self._is_operator_disabled: self.log.debug( f"Skipping extraction for operator {self.operator.task_type} " "due to its presence in [openlineage] openlineage_disabled_for_operators." @@ -123,6 +127,12 @@ class DefaultExtractor(BaseExtractor): return None def extract_on_complete(self, task_instance) -> OperatorLineage | None: +if self._is_operator_disabled: +self.log.debug( +f"Skipping extraction for operator {self.operator.task_type} " +"due to its presence in [openlineage] openlineage_disabled_for_operators." +) +return None if task_instance.state == TaskInstanceState.FAILED: on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None) if on_failed and callable(on_failed): diff --git a/tests/providers/openlineage/extractors/test_base.py b/tests/providers/openlineage/extractors/test_base.py index 309e0c1a79..35d51ee293 100644 --- a/tests/providers/openlineage/extractors/test_base.py +++ b/tests/providers/openlineage/extractors/test_base.py @@ -285,3 +285,17 @@ def test_default_extractor_uses_wrong_operatorlineage_class(): assert ( ExtractorManager().extract_metadata(mock.MagicMock(), operator, complete=False) == OperatorLineage() ) + + +@mock.patch.dict( +os.environ, +{ +"AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS": "tests.providers.openlineage.extractors.test_base.ExampleOperator" +}, +) +def test_default_extraction_disabled_operator(): +extractor = DefaultExtractor(ExampleOperator(task_id="test")) +metadata = extractor.extract() +assert metadata is None +metadata = extractor.extract_on_complete(None) +assert metadata is None diff --git a/tests/providers/openlineage/extractors/test_bash.py b/tests/providers/openlineage/extractors/test_bash.py index 4919f2b873..b5fe07741e 100644 --- a/tests/providers/openlineage/extractors/test_bash.py +++ b/tests/providers/openlineage/extractors/test_bash.py @@ -117,3 +117,16 @@ def test_extract_dag_bash_command_env_does_not_disable_on_random_string(): def test_extract_dag_bash_command_conf_does_not_disable_on_random_string(): extractor = BashExtractor(bash_task) assert extractor.extract().job_facets["sourceCode"] == SourceCodeJobFacet("bash", "ls -halt && exit 0") + + +@patch.dict( +os.environ, +{"AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS": "airflow.operators.bash.BashOperator"}, +) +def test_bash_extraction_disabled_operator(): +operator = BashOperator(task_id="taskid", bash_command="echo 1;") +extractor = Bash
(airflow) branch main updated: fix static checks for openlineage provider (#37092)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 674ea5dd9e fix static checks for openlineage provider (#37092) 674ea5dd9e is described below commit 674ea5dd9e6704203a98b4031c0dd5f248ca0407 Author: Vadim Vladimirov <47372390+impressionableracc...@users.noreply.github.com> AuthorDate: Tue Jan 30 15:19:49 2024 +0300 fix static checks for openlineage provider (#37092) related to this PR: https://github.com/apache/airflow/pull/36222 --- docs/apache-airflow-providers-openlineage/macros.rst | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/apache-airflow-providers-openlineage/macros.rst b/docs/apache-airflow-providers-openlineage/macros.rst index b71842a575..3526ee1d37 100644 --- a/docs/apache-airflow-providers-openlineage/macros.rst +++ b/docs/apache-airflow-providers-openlineage/macros.rst @@ -29,7 +29,9 @@ Lineage run id PythonOperator( task_id="render_template", python_callable=my_task_function, -op_args=["{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}"], # lineage_run_id macro invoked +op_args=[ +"{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}" +], # lineage_run_id macro invoked provide_context=False, dag=dag, ) @@ -41,7 +43,9 @@ Lineage parent id PythonOperator( task_id="render_template", python_callable=my_task_function, -op_args=["{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}"], # macro invoked +op_args=[ +"{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}" +], # macro invoked provide_context=False, dag=dag, )
(airflow) branch main updated: Fix macros jinja template example (#36222)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c81be6f78c Fix macros jinja template example (#36222) c81be6f78c is described below commit c81be6f78cc1a866170c98e3bc5f0c26d0fa7c02 Author: Honza Stepanovsky AuthorDate: Tue Jan 30 10:21:59 2024 +0100 Fix macros jinja template example (#36222) * Fix macros jinja template example * Fix args as well --- docs/apache-airflow-providers-openlineage/macros.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/apache-airflow-providers-openlineage/macros.rst b/docs/apache-airflow-providers-openlineage/macros.rst index 0aed55feb9..b71842a575 100644 --- a/docs/apache-airflow-providers-openlineage/macros.rst +++ b/docs/apache-airflow-providers-openlineage/macros.rst @@ -29,7 +29,7 @@ Lineage run id PythonOperator( task_id="render_template", python_callable=my_task_function, -op_args=["{{ lineage_run_id(task, task_instance) }}"], # lineage_run_id macro invoked +op_args=["{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}"], # lineage_run_id macro invoked provide_context=False, dag=dag, ) @@ -41,7 +41,7 @@ Lineage parent id PythonOperator( task_id="render_template", python_callable=my_task_function, -op_args=["{{ lineage_parent_id(run_id, task_instance) }}"], # macro invoked +op_args=["{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}"], # macro invoked provide_context=False, dag=dag, )
(airflow) branch main updated: feat: Add dag_id when generating OpenLineage run_id for task instance. (#36659)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 95a83102e8 feat: Add dag_id when generating OpenLineage run_id for task instance. (#36659) 95a83102e8 is described below commit 95a83102e8753c2f8caf5b0d5c847f4c7f254f67 Author: Kacper Muda AuthorDate: Tue Jan 9 13:15:07 2024 +0100 feat: Add dag_id when generating OpenLineage run_id for task instance. (#36659) --- airflow/providers/dbt/cloud/utils/openlineage.py | 5 +- airflow/providers/openlineage/plugins/adapter.py | 4 +- airflow/providers/openlineage/plugins/listener.py | 27 ++ airflow/providers/openlineage/plugins/macros.py| 10 +++- tests/always/test_project_structure.py | 1 - .../providers/openlineage/plugins/test_listener.py | 57 +- tests/providers/openlineage/plugins/test_macros.py | 52 .../plugins/test_openlineage_adapter.py| 44 - 8 files changed, 149 insertions(+), 51 deletions(-) diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py b/airflow/providers/dbt/cloud/utils/openlineage.py index 6a0934d412..f86c77a689 100644 --- a/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/airflow/providers/dbt/cloud/utils/openlineage.py @@ -121,7 +121,10 @@ def generate_openlineage_events_from_dbt_cloud_run( # generate same run id of current task instance parent_run_id = OpenLineageAdapter.build_task_instance_run_id( -operator.task_id, task_instance.execution_date, task_instance.try_number - 1 +dag_id=task_instance.dag_id, +task_id=operator.task_id, +execution_date=task_instance.execution_date, +try_number=task_instance.try_number - 1, ) parent_job = ParentRunMetadata( diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index ad648f8828..6f16b01706 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -102,11 +102,11 @@ class OpenLineageAdapter(LoggingMixin): return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.{dag_id}.{dag_run_id}")) @staticmethod -def build_task_instance_run_id(task_id, execution_date, try_number): +def build_task_instance_run_id(dag_id, task_id, execution_date, try_number): return str( uuid.uuid3( uuid.NAMESPACE_URL, -f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}", + f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}", ) ) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 6fa02b9fc1..8c731dd6ff 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -77,7 +77,10 @@ class OpenLineageListener: parent_run_id = self.adapter.build_dag_run_id(dag.dag_id, dagrun.run_id) task_uuid = self.adapter.build_task_instance_run_id( -task.task_id, task_instance.execution_date, task_instance.try_number +dag_id=dag.dag_id, +task_id=task.task_id, +execution_date=task_instance.execution_date, +try_number=task_instance.try_number, ) task_metadata = self.extractor_manager.extract_metadata(dagrun, task) @@ -116,14 +119,17 @@ class OpenLineageListener: task = task_instance.task dag = task.dag -task_uuid = OpenLineageAdapter.build_task_instance_run_id( -task.task_id, task_instance.execution_date, task_instance.try_number - 1 -) - @print_warning(self.log) def on_success(): parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id) +task_uuid = OpenLineageAdapter.build_task_instance_run_id( +dag_id=dag.dag_id, +task_id=task.task_id, +execution_date=task_instance.execution_date, +try_number=task_instance.try_number - 1, +) + task_metadata = self.extractor_manager.extract_metadata( dagrun, task, complete=True, task_instance=task_instance ) @@ -149,14 +155,17 @@ class OpenLineageListener: task = task_instance.task dag = task.dag -task_uuid = OpenLineageAdapter.build_task_instance_run_id( -task.task_id, task_instance.execution_date, task_instance.try_number -) - @print_warning(self.log) def on_failure(): parent_run_id = OpenLineageAdapter.build_dag_run
(airflow) branch main updated: feat: Add openlineage support for CopyFromExternalStageToSnowflakeOperator (#36535)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 3dc99d8a28 feat: Add openlineage support for CopyFromExternalStageToSnowflakeOperator (#36535) 3dc99d8a28 is described below commit 3dc99d8a285aaadeb83797e691c9f6ec93ff9c93 Author: Kacper Muda AuthorDate: Mon Jan 8 13:02:46 2024 +0100 feat: Add openlineage support for CopyFromExternalStageToSnowflakeOperator (#36535) --- .../snowflake/transfers/copy_into_snowflake.py | 163 +++- .../transfers/test_copy_into_snowflake.py | 168 - 2 files changed, 327 insertions(+), 4 deletions(-) diff --git a/airflow/providers/snowflake/transfers/copy_into_snowflake.py b/airflow/providers/snowflake/transfers/copy_into_snowflake.py index 10071add1a..342d5dc35a 100644 --- a/airflow/providers/snowflake/transfers/copy_into_snowflake.py +++ b/airflow/providers/snowflake/transfers/copy_into_snowflake.py @@ -108,8 +108,12 @@ class CopyFromExternalStageToSnowflakeOperator(BaseOperator): self.copy_options = copy_options self.validation_mode = validation_mode +self.hook: SnowflakeHook | None = None +self._sql: str | None = None +self._result: list[dict[str, Any]] = [] + def execute(self, context: Any) -> None: -snowflake_hook = SnowflakeHook( +self.hook = SnowflakeHook( snowflake_conn_id=self.snowflake_conn_id, warehouse=self.warehouse, database=self.database, @@ -127,7 +131,7 @@ class CopyFromExternalStageToSnowflakeOperator(BaseOperator): if self.columns_array: into = f"{into}({', '.join(self.columns_array)})" -sql = f""" +self._sql = f""" COPY INTO {into} FROM @{self.stage}/{self.prefix or ""} {"FILES=(" + ",".join(map(enclose_param, self.files)) + ")" if self.files else ""} @@ -137,5 +141,158 @@ class CopyFromExternalStageToSnowflakeOperator(BaseOperator): {self.validation_mode or ""} """ self.log.info("Executing COPY command...") -snowflake_hook.run(sql=sql, autocommit=self.autocommit) +self._result = self.hook.run( # type: ignore # mypy does not work well with return_dictionaries=True +sql=self._sql, +autocommit=self.autocommit, +handler=lambda x: x.fetchall(), +return_dictionaries=True, +) self.log.info("COPY command completed") + +@staticmethod +def _extract_openlineage_unique_dataset_paths( +query_result: list[dict[str, Any]], +) -> tuple[list[tuple[str, str]], list[str]]: +"""Extracts and returns unique OpenLineage dataset paths and file paths that failed to be parsed. + +Each row in the results is expected to have a 'file' field, which is a URI. +The function parses these URIs and constructs a set of unique OpenLineage (namespace, name) tuples. +Additionally, it captures any URIs that cannot be parsed or processed +and returns them in a separate error list. + +For Azure, Snowflake has a unique way of representing URI: + azure://.blob.core.windows.net//path/to/file.csv +that is transformed by this function to a Dataset with more universal naming convention: +Dataset(namespace="wasbs://container_name@account_name", name="path/to"), as described at + https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md#wasbs-azure-blob-storage + +:param query_result: A list of dictionaries, each containing a 'file' key with a URI value. +:return: Two lists - the first is a sorted list of tuples, each representing a unique dataset path, + and the second contains any URIs that cannot be parsed or processed correctly. + +>>> method = CopyFromExternalStageToSnowflakeOperator._extract_openlineage_unique_dataset_paths + +>>> results = [{"file": "azure://my_account.blob.core.windows.net/azure_container/dir3/file.csv"}] +>>> method(results) +([('wasbs://azure_container@my_account', 'dir3')], []) + +>>> results = [{"file": "azure://my_account.blob.core.windows.net/azure_container"}] +>>> method(results) +([('wasbs://azure_container@my_account', '/')], []) + +>>> results = [{"file": "s3://bucket"}, {"file": "gcs://bucket/"}, {"file": "s3://bucket/a.csv"}] +>>> method(results) +
(airflow) branch remove-experimental-listener updated (e069ab4274 -> be957e18ff)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch remove-experimental-listener in repository https://gitbox.apache.org/repos/asf/airflow.git omit e069ab4274 Update newsfragments/1234.significant.rst omit 6db16cc01c note that task instance, dag and lifecycle listeners are non-experimental add be957e18ff note that task instance, dag and lifecycle listeners are non-experimental This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (e069ab4274) \ N -- N -- N refs/heads/remove-experimental-listener (be957e18ff) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: newsfragments/{1234.significant.rst => 36376.significant.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename newsfragments/{1234.significant.rst => 36376.significant.rst} (100%)
(airflow) branch remove-experimental-listener updated (6db16cc01c -> e069ab4274)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch remove-experimental-listener in repository https://gitbox.apache.org/repos/asf/airflow.git from 6db16cc01c note that task instance, dag and lifecycle listeners are non-experimental add e069ab4274 Update newsfragments/1234.significant.rst No new revisions were added by this update. Summary of changes: newsfragments/1234.significant.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(airflow) branch main updated: Fix typo. (#36362)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new fcb527ae94 Fix typo. (#36362) fcb527ae94 is described below commit fcb527ae94f44e610af3e2e85cbf5f223aa2e61b Author: Jakub Dardzinski AuthorDate: Fri Dec 22 11:14:37 2023 +0100 Fix typo. (#36362) Change default value of openlineage extractors config. Signed-off-by: Jakub Dardzinski --- .../providers/openlineage/extractors/manager.py| 5 ++-- airflow/providers/openlineage/provider.yaml| 2 +- .../providers/openlineage/extractors/test_base.py | 27 ++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index 480ffc9f39..6d9fabc5cf 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -63,8 +63,9 @@ class ExtractorManager(LoggingMixin): for operator_class in extractor.get_operator_classnames(): self.extractors[operator_class] = extractor -env_extractors = conf.get("openlinege", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS")) -if env_extractors is not None: +env_extractors = conf.get("openlineage", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS")) +# skip either when it's empty string or None +if env_extractors: for extractor in env_extractors.split(";"): extractor: type[BaseExtractor] = try_import_from_string(extractor.strip()) for operator_class in extractor.get_operator_classnames(): diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index 9bd98e8b93..ebfe31749b 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -85,7 +85,7 @@ config: Semicolon separated paths to custom OpenLineage extractors. type: string example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass -default: "" +default: ~ version_added: ~ config_path: description: | diff --git a/tests/providers/openlineage/extractors/test_base.py b/tests/providers/openlineage/extractors/test_base.py index 7c2174fe5b..309e0c1a79 100644 --- a/tests/providers/openlineage/extractors/test_base.py +++ b/tests/providers/openlineage/extractors/test_base.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import os from typing import Any from unittest import mock @@ -27,11 +28,13 @@ from openlineage.client.run import Dataset from airflow.models.baseoperator import BaseOperator from airflow.operators.python import PythonOperator from airflow.providers.openlineage.extractors.base import ( +BaseExtractor, DefaultExtractor, OperatorLineage, ) from airflow.providers.openlineage.extractors.manager import ExtractorManager from airflow.providers.openlineage.extractors.python import PythonExtractor +from tests.test_utils.config import conf_vars pytestmark = pytest.mark.db_test @@ -52,6 +55,12 @@ class CompleteRunFacet(BaseFacet): FINISHED_FACETS: dict[str, BaseFacet] = {"complete": CompleteRunFacet(True)} +class ExampleExtractor(BaseExtractor): +@classmethod +def get_operator_classnames(cls): +return ["ExampleOperator"] + + class ExampleOperator(BaseOperator): def execute(self, context) -> Any: pass @@ -221,6 +230,24 @@ def test_extraction_without_on_start(): ) +@mock.patch.dict( +os.environ, +{"OPENLINEAGE_EXTRACTORS": "tests.providers.openlineage.extractors.test_base.ExampleExtractor"}, +) +def test_extractors_env_var(): +extractor = ExtractorManager().get_extractor_class(ExampleOperator(task_id="example")) +assert extractor is ExampleExtractor + + +@mock.patch.dict(os.environ, {"OPENLINEAGE_EXTRACTORS": "no.such.extractor"}) +@conf_vars( +{("openlineage", "extractors"): "tests.providers.openlineage.extractors.test_base.ExampleExtractor"} +) +def test_config_has_precedence_over_env_var(): +extractor = ExtractorManager().get_extractor_class(ExampleOperator(task_id="example")) +assert extractor is ExampleExtractor + + def test_does_not_use_default_extractor_when_not_a_method(): extractor_class = ExtractorManager().get_extractor_class(BrokenOperator(task_id="a")) assert extractor_class is None
(airflow) branch remove-experimental-listener created (now 6db16cc01c)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch remove-experimental-listener in repository https://gitbox.apache.org/repos/asf/airflow.git at 6db16cc01c note that task instance, dag and lifecycle listeners are non-experimental This branch includes the following new commits: new 6db16cc01c note that task instance, dag and lifecycle listeners are non-experimental The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(airflow) 01/01: note that task instance, dag and lifecycle listeners are non-experimental
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch remove-experimental-listener in repository https://gitbox.apache.org/repos/asf/airflow.git commit 6db16cc01c080d8f6434fe19fddcf8d4df798a53 Author: Maciej Obuchowski AuthorDate: Thu Dec 21 11:46:28 2023 +0100 note that task instance, dag and lifecycle listeners are non-experimental Signed-off-by: Maciej Obuchowski --- .../administration-and-deployment/listeners.rst| 8 newsfragments/1234.significant.rst | 18 ++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst b/docs/apache-airflow/administration-and-deployment/listeners.rst index 0672e07779..19f5d27646 100644 --- a/docs/apache-airflow/administration-and-deployment/listeners.rst +++ b/docs/apache-airflow/administration-and-deployment/listeners.rst @@ -58,6 +58,9 @@ Dataset Events Dataset events occur when Dataset management operations are run. +|experimental| + + Usage - @@ -68,9 +71,6 @@ To create a listener: Airflow defines the specification as `hookspec <https://github.com/apache/airflow/tree/main/airflow/listeners/spec>`__. Your implementation must accept the same named parameters as defined in hookspec. If you don't use the same parameters as hookspec, Pluggy throws an error when you try to use your plugin. But you don't need to implement every method. Many listeners only implement one method, or a subset of methods. -To include the listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin ` +To include the listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin `. Listener API is meant to be called across all DAGs and all operators. You can't listen to events generated by specific DAGs. For that behavior, try methods like ``on_success_callback`` and ``pre_execute``. These provide callbacks for particular DAG authors or operator creators. The logs and ``print()`` calls will be handled as part of the listeners. - - -|experimental| diff --git a/newsfragments/1234.significant.rst b/newsfragments/1234.significant.rst new file mode 100644 index 00..7ecd81ad00 --- /dev/null +++ b/newsfragments/1234.significant.rst @@ -0,0 +1,18 @@ +Since Airflow 2.9.0, following Listener API methods are not considered experimental anymore: + +Lifecycle events: + +- ``on_starting`` +- ``before_stopping`` + +DagRun State Change Events: + +- ``on_dag_run_running`` +- ``on_dag_run_success`` +- ``on_dag_run_failed`` + +TaskInstance State Change Events: + +- ``on_task_instance_running`` +- ``on_task_instance_success`` +- ``on_task_instance_failed``
(airflow) branch hooks-lineage updated (1d4c424fb0 -> f15a1e0101)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git discard 1d4c424fb0 hooks poc add e37fe8f576 Add `jsonpath_ng.ext.parse` support for `SqsSensor` (#36170) add 1eca667e5f Create FAB provider and move FAB auth manager in it (#35926) add 5439b494b0 Add helper function for CRUD operations on weaviate's schema and class objects (#35919) add f133643978 Add --forward-credentials flag to test commmands (#36176) add 357355ac09 Remove `is_authorized_cluster_activity` from auth manager (#36175) add 47a9c8a4a4 Added Datascan Profiling (#35696) add a7bb9e2393 Include removed providers when preparing packages (#36160) add e9b76bc38d Update providers metadata 2023-12-12 (#36184) add f8b322d61c Add pre-commmit check for providers list in bug report is unique/sorted (#36183) new f15a1e0101 hooks poc This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (1d4c424fb0) \ N -- N -- N refs/heads/hooks-lineage (f15a1e0101) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../airflow_providers_bug_report.yml |5 +- .pre-commit-config.yaml|9 +- CONTRIBUTING.rst |2 +- INSTALL|2 +- STATIC_CODE_CHECKS.rst |2 + airflow/api/auth/backend/basic_auth.py |8 +- airflow/api/auth/backend/kerberos_auth.py |2 +- .../endpoints/forward_to_fab_endpoint.py |4 +- .../schemas/role_and_permission_schema.py |2 +- airflow/api_connexion/schemas/user_schema.py |2 +- airflow/api_connexion/security.py |2 +- airflow/auth/managers/base_auth_manager.py | 14 - .../managers/fab/api/auth/backend/basic_auth.py| 55 +- .../managers/fab/api/auth/backend/kerberos_auth.py | 27 +- .../auth/managers/fab/security_manager/override.py | 2659 +--- airflow/cli/commands/standalone_command.py |2 +- airflow/config_templates/config.yml|2 +- airflow/lineage/hook.py| 17 +- .../versions/0073_2_0_0_prefix_dag_permissions.py |2 +- .../amazon/aws/auth_manager/aws_auth_manager.py|3 - airflow/providers/amazon/aws/hooks/s3.py | 10 + airflow/providers/amazon/aws/sensors/sqs.py| 11 +- airflow/providers/amazon/aws/triggers/sqs.py |6 +- airflow/providers/amazon/aws/utils/sqs.py | 20 +- airflow/providers/fab/CHANGELOG.rst| 26 + .../fab/cli_commands => providers/fab}/__init__.py |0 .../fab/auth_manager}/__init__.py |0 .../fab/auth_manager/api}/__init__.py |0 .../fab/auth_manager}/api/auth/__init__.py |0 .../fab/auth_manager/api/auth/backend}/__init__.py |0 .../auth_manager}/api/auth/backend/basic_auth.py | 13 +- .../api/auth/backend/kerberos_auth.py | 15 +- .../fab/auth_manager}/api_endpoints/__init__.py|0 .../api_endpoints/role_and_permission_endpoint.py | 31 +- .../auth_manager}/api_endpoints/user_endpoint.py | 23 +- .../fab/auth_manager/cli_commands}/__init__.py |0 .../fab/auth_manager}/cli_commands/definition.py | 34 +- .../fab/auth_manager}/cli_commands/role_command.py |6 +- .../cli_commands/sync_perm_command.py |2 +- .../fab/auth_manager}/cli_commands/user_command.py |2 +- .../fab/auth_manager}/cli_commands/utils.py|0 .../fab/auth_manager/decorators}/__init__.py |0 .../fab/auth_manager}/decorators/auth.py | 10 +- .../fab/auth_manager}/fab_auth_manager.py | 31 +- .../fab/auth_manager}/models/__init__.py |0 .../fab/auth_manager}/models/anonymous_user.py |0 .../fab/auth_manager/openapi}/__init__.py |0 .../fab/auth_manager}/openapi/v1.yaml | 22 +- .../fab/auth_manager/security_manager}/__init__.py
(airflow) 01/01: hooks poc
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git commit f15a1e01011f1a51fec82d5fcfefa44ba3bc1778 Author: Maciej Obuchowski AuthorDate: Fri Nov 3 16:49:02 2023 +0100 hooks poc Signed-off-by: Maciej Obuchowski datasets from hooks Signed-off-by: Maciej Obuchowski --- airflow/api_connexion/schemas/dataset_schema.py| 1 + airflow/datasets/__init__.py | 3 +- airflow/datasets/manager.py| 6 +- airflow/hooks/base.py | 17 + airflow/lineage/hook.py| 94 ++ .../0134_2_9_0_add_automatic_field_to_dataset_.py | 48 +++ airflow/models/dataset.py | 2 + airflow/operators/python.py| 63 airflow/providers/amazon/aws/hooks/s3.py | 18 ++ airflow/providers/google/cloud/hooks/gcs.py| 28 ++ .../providers/google/cloud/transfers/gcs_to_gcs.py | 42 +-- .../providers/google/cloud/transfers/s3_to_gcs.py | 16 + .../providers/openlineage/extractors/manager.py| 64 +++- airflow/providers/openlineage/plugins/listener.py | 2 + airflow/providers/openlineage/utils/utils.py | 4 +- docs/apache-airflow/img/airflow_erd.sha256 | 6 +- docs/apache-airflow/img/airflow_erd.svg| 346 +++-- docs/apache-airflow/migrations-ref.rst | 6 +- 18 files changed, 560 insertions(+), 206 deletions(-) diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index bfdd0d2423..04307f1ae8 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -73,6 +73,7 @@ class DatasetSchema(SQLAlchemySchema): updated_at = auto_field() producing_tasks = fields.List(fields.Nested(TaskOutletDatasetReferenceSchema)) consuming_dags = fields.List(fields.Nested(DagScheduleDatasetReferenceSchema)) +automatic = auto_field() class DatasetCollection(NamedTuple): diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 0dc635a00b..a5bd88adb7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -28,8 +28,9 @@ class Dataset: uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) extra: dict[str, Any] | None = None +automatic: bool = False -__version__: ClassVar[int] = 1 +__version__: ClassVar[int] = 2 @uri.validator def _check_uri(self, attr, uri: str): diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 08871c9f65..1742ef9bf6 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -52,7 +52,11 @@ class DatasetManager(LoggingMixin): session.flush() for dataset_model in dataset_models: -self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, extra=dataset_model.extra)) +self.notify_dataset_created( +dataset=Dataset( +uri=dataset_model.uri, extra=dataset_model.extra, automatic=dataset_model.automatic +) +) def register_dataset_change( self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index 6ec0a8938e..f490cfb202 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -27,6 +27,7 @@ from airflow.typing_compat import Protocol from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: +from airflow.datasets import Dataset from airflow.models.connection import Connection # Avoid circular imports. log = logging.getLogger(__name__) @@ -106,6 +107,22 @@ class BaseHook(LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: return {} +def add_input_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_input(dataset, self) + +def add_output_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_output(dataset, self) + +def ol_to_airflow_dataset(self, ol_dataset): +raise NotImplementedError() + +def airflow_to_ol_dataset(self, dataset): +raise NotImplementedError() + class DiscoverableHook(Protocol): """ diff --git a/airflow/lineage/hook.py b/airflow/lineage/hook.py new file mode 100644 index 00..1b64d233ef --- /dev/null +++ b/airflow/lineage/hook.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE f
(airflow) 01/01: hooks poc
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git commit 1d4c424fb03f97d4a7f974de33722b8c5f82b3b7 Author: Maciej Obuchowski AuthorDate: Fri Nov 3 16:49:02 2023 +0100 hooks poc Signed-off-by: Maciej Obuchowski datasets from hooks Signed-off-by: Maciej Obuchowski --- airflow/api_connexion/schemas/dataset_schema.py| 1 + airflow/datasets/__init__.py | 3 +- airflow/datasets/manager.py| 6 +- airflow/hooks/base.py | 17 + airflow/lineage/hook.py| 87 ++ .../0134_2_9_0_add_automatic_field_to_dataset_.py | 48 +++ airflow/models/dataset.py | 2 + airflow/operators/python.py| 63 airflow/providers/amazon/aws/hooks/s3.py | 8 + airflow/providers/google/cloud/hooks/gcs.py| 28 ++ .../providers/google/cloud/transfers/gcs_to_gcs.py | 42 +-- .../providers/google/cloud/transfers/s3_to_gcs.py | 16 + .../providers/openlineage/extractors/manager.py| 64 +++- airflow/providers/openlineage/plugins/listener.py | 3 + docs/apache-airflow/img/airflow_erd.sha256 | 6 +- docs/apache-airflow/img/airflow_erd.svg| 346 +++-- docs/apache-airflow/migrations-ref.rst | 6 +- 17 files changed, 542 insertions(+), 204 deletions(-) diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index bfdd0d2423..04307f1ae8 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -73,6 +73,7 @@ class DatasetSchema(SQLAlchemySchema): updated_at = auto_field() producing_tasks = fields.List(fields.Nested(TaskOutletDatasetReferenceSchema)) consuming_dags = fields.List(fields.Nested(DagScheduleDatasetReferenceSchema)) +automatic = auto_field() class DatasetCollection(NamedTuple): diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 0dc635a00b..a5bd88adb7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -28,8 +28,9 @@ class Dataset: uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) extra: dict[str, Any] | None = None +automatic: bool = False -__version__: ClassVar[int] = 1 +__version__: ClassVar[int] = 2 @uri.validator def _check_uri(self, attr, uri: str): diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 08871c9f65..1742ef9bf6 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -52,7 +52,11 @@ class DatasetManager(LoggingMixin): session.flush() for dataset_model in dataset_models: -self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, extra=dataset_model.extra)) +self.notify_dataset_created( +dataset=Dataset( +uri=dataset_model.uri, extra=dataset_model.extra, automatic=dataset_model.automatic +) +) def register_dataset_change( self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index 6ec0a8938e..f490cfb202 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -27,6 +27,7 @@ from airflow.typing_compat import Protocol from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: +from airflow.datasets import Dataset from airflow.models.connection import Connection # Avoid circular imports. log = logging.getLogger(__name__) @@ -106,6 +107,22 @@ class BaseHook(LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: return {} +def add_input_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_input(dataset, self) + +def add_output_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_output(dataset, self) + +def ol_to_airflow_dataset(self, ol_dataset): +raise NotImplementedError() + +def airflow_to_ol_dataset(self, dataset): +raise NotImplementedError() + class DiscoverableHook(Protocol): """ diff --git a/airflow/lineage/hook.py b/airflow/lineage/hook.py new file mode 100644 index 00..e85187bff0 --- /dev/null +++ b/airflow/lineage/hook.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding c
(airflow) branch hooks-lineage updated (6ae8fb11d9 -> 1d4c424fb0)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git discard 6ae8fb11d9 hooks poc add 7e1f67b4b8 Fix a bug in method name used in `GlacierToGCSOperator` (#35978) add ddb4370dc6 Update `set_context` signature to match superclass one and stop setting the instance attribute (#35975) add 4a9824de8e Replace default empty dict value by None in AzureBlobStorageToS3Operator (#35977) add 20db3dbfa9 Fix nits in LOCAL_VIRTUALENV.rst (#35985) add cbb9c4f8cc Fix airflow db shell needing an extra keypress to exit (#35982) add a9132f3891 Move RDS hook to a cached property in RDS trigger (#35990) add 8346fd58e8 Fix a bug with accessing hooks in EKS trigger (#35989) add 9bcee9d439 Bump FAB to 4.3.10 (#35991) add eed6427b66 Avoid creating the hook in the EmrServerlessCancelJobsTrigger init (#35992) add cf052dc64f Add feature to build "chicken-egg" packages from sources (#35890) add 396b1ba34e Cleanup code for elasticsearch<8 (#35707) add 9f212d4276 Refacto get_output_location in AthenaHook (#35996) add ab87cd02e4 Pass conn ID to ObjectStoragePath via URI (#35913) add 4117f1b013 Switch "latest" image to point to newest supported Python version (#36003) add 8be03c9937 Add a cache for weaviate client (#35983) add 217c89325a Fix warning message in `Connection.get_hook` in case of ImportError (#36005) add 8829d1732c Add support for chicken-egg providers to dockerhub release process (#36002) add b1e547ef53 Refactor some methods in EmrContainerHook (#35999) add decc6d9414 Improve typing hints for only_client_type decorator (#35997) add 8f2cf41538 Fix reraise outside of try block in `AzureSynapsePipelineRunLink.get_fields_from_url` (#36009) add c26aa12bcc [AIP-44] Introduce Pydantic model for LogTemplate (#36004) add fd03dc2933 Fix reraise outside of try block in `AthenaHook.get_output_location` (#36008) add 86b1bd22d1 Fix CloudRunExecuteJobOperator not able to retrieve the Cloud Run job status in deferrable mode (#36012) add 1c6bbe2841 Feature/trino provider timezone (#35963) add 9e28475402 Add multiselect to run state in grid view (#35403) add 8d0229b8f6 Sort build options in breeze commands (#36013) add dde4065995 Add semi-automated misc classification for providers release (#36018) add 41f4766d5b Update branch name in release guide (#36020) add e331559831 Support DOCKER_HOST variable passing to Breeze commands (#36011) add 7f049aa868 Temporary limit pipx to let our CI work (#36027) add f5d802791f Change Trigger UI to use HTTP POST in web ui (#36026) add 9c168b76e8 New breeze command to clean up previous provider artifacts (#35970) add fd0988369b Use dropdown instead of buttons when there are more than 10 retries in log tab (#36025) add 549fac30ee 34058: Fix UI Grid error when DAG has been removed. (#36028) add 618fbae2c6 Improve readability and content of the security policy (#36023) add 9bd192eb0a Remove upper bound constraint on aiobotocore (#36031) add 3f354c0c92 Do not catch too broad exception in `WasbHook.delete_container` (#36034) add 9845b40a75 Limit Pytest-asyncio to < 0.23.1 (#36037) add cc2521cf6c Limit pytest-asyncio even more - to <0.23.0 (#36040) add 395ac46349 Add the section describing the security model of DAG Author capabilities (#36022) add 55eac5e2c7 Remove pytest-asyncio upper-binding limitatin (#36046) add 0953e0f844 Mount selected sources of Airflow to image when running mypy checks (#36038) add d0f4512ecb Fix DataprocSubmitJobOperator to retrieve failed job error message (#36053) add 77c01031d6 Add XCom tab to Grid (#35719) add 35a1b7a63a fix: Repair run_id for OpenLineage FAIL events (#36051) add e0df7441fa Fix a bug in get_iam_token for Redshift Serverless (#36001) add 04a781666b Fix handling of single quotes in `RedshiftToS3Operator` (#35986) add 55d81378b0 Update supported-versions.rst (#36058) add 61fd166a46 Avoid crushing container when directory is not found on rm (#36050) add 7ececfdb21 Update reset_user_sessions to work from either CLI or web (#36056) add 15406d412a Do not skip mounting sources on tagged builds by default (#36060) add de71a62848 fix: KPO typing env_vars (#36048) add 1264316fe7 Drive-by improvements to convert_env_vars (#36062) add 0376e9324a Fix gantt chart queued duration when queued_dttm is greater than start_date for deferred tasks. (#35984) add f83bf9366b Update `example_ec2` to Create Instance Which Supports Hibernation (#35790) (#35839) add acd95a5ef1 feat: Add parent_run_id for COMPLETE and FAIL events (#36067) add 8ae67a04e9 Add read access to pools for viewer role (#35352) add acf91af6bf Update `b
(airflow) 02/02: datasets from hooks
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git commit 351f7fe2271812ea6ba43c2ebce65c3f163854fa Author: Maciej Obuchowski AuthorDate: Mon Dec 11 18:41:54 2023 +0100 datasets from hooks Signed-off-by: Maciej Obuchowski --- airflow/api_connexion/schemas/dataset_schema.py| 1 + airflow/datasets/__init__.py | 3 +- airflow/datasets/manager.py| 6 +- airflow/hooks/base.py | 22 +- airflow/lineage/hook.py| 43 ++- .../0134_2_9_0_add_automatic_field_to_dataset_.py | 48 +++ airflow/models/dataset.py | 2 + airflow/providers/amazon/aws/hooks/s3.py | 17 +- airflow/providers/google/cloud/hooks/gcs.py| 31 +- .../providers/google/cloud/transfers/gcs_to_gcs.py | 42 +-- .../providers/openlineage/extractors/manager.py| 58 +++- airflow/providers/openlineage/plugins/listener.py | 3 + docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg| 346 +++-- docs/apache-airflow/migrations-ref.rst | 4 +- 15 files changed, 383 insertions(+), 245 deletions(-) diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index bfdd0d2423..04307f1ae8 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -73,6 +73,7 @@ class DatasetSchema(SQLAlchemySchema): updated_at = auto_field() producing_tasks = fields.List(fields.Nested(TaskOutletDatasetReferenceSchema)) consuming_dags = fields.List(fields.Nested(DagScheduleDatasetReferenceSchema)) +automatic = auto_field() class DatasetCollection(NamedTuple): diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 0dc635a00b..a5bd88adb7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -28,8 +28,9 @@ class Dataset: uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) extra: dict[str, Any] | None = None +automatic: bool = False -__version__: ClassVar[int] = 1 +__version__: ClassVar[int] = 2 @uri.validator def _check_uri(self, attr, uri: str): diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 08871c9f65..1742ef9bf6 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -52,7 +52,11 @@ class DatasetManager(LoggingMixin): session.flush() for dataset_model in dataset_models: -self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, extra=dataset_model.extra)) +self.notify_dataset_created( +dataset=Dataset( +uri=dataset_model.uri, extra=dataset_model.extra, automatic=dataset_model.automatic +) +) def register_dataset_change( self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index a989c327c4..f490cfb202 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -23,11 +23,11 @@ import warnings from typing import TYPE_CHECKING, Any from airflow.exceptions import RemovedInAirflow3Warning -from airflow.lineage.hook import get_hook_lineage_collector from airflow.typing_compat import Protocol from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: +from airflow.datasets import Dataset from airflow.models.connection import Connection # Avoid circular imports. log = logging.getLogger(__name__) @@ -107,17 +107,21 @@ class BaseHook(LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: return {} -@classmethod -def add_input_dataset(cls, name: str, namespace: str): -from openlineage.client.run import Dataset as OpenLineageDataset +def add_input_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector - get_hook_lineage_collector().add_input(OpenLineageDataset(namespace=namespace, name=name)) +get_hook_lineage_collector().add_input(dataset, self) -@classmethod -def add_output_dataset(cls, name: str, namespace: str): -from openlineage.client.run import Dataset as OpenLineageDataset +def add_output_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_output(dataset, self) - get_hook_lineage_collector().add_output(OpenLineageDataset(namespace=namespace, name=name)) +def ol_to_airflow_dataset(self, ol_dataset): +raise NotImplementedError() + +
(airflow) 01/01: hooks poc
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git commit 6ae8fb11d924c1a863e3445d803beb2d85d31f4b Author: Maciej Obuchowski AuthorDate: Fri Nov 3 16:49:02 2023 +0100 hooks poc Signed-off-by: Maciej Obuchowski datasets from hooks Signed-off-by: Maciej Obuchowski --- airflow/api_connexion/schemas/dataset_schema.py| 1 + airflow/datasets/__init__.py | 3 +- airflow/datasets/manager.py| 6 +- airflow/hooks/base.py | 17 + airflow/lineage/hook.py| 87 ++ .../0134_2_9_0_add_automatic_field_to_dataset_.py | 48 +++ airflow/models/dataset.py | 2 + airflow/operators/python.py| 63 airflow/providers/amazon/aws/hooks/s3.py | 8 + airflow/providers/google/cloud/hooks/gcs.py| 28 ++ .../providers/google/cloud/transfers/gcs_to_gcs.py | 42 +-- .../providers/google/cloud/transfers/s3_to_gcs.py | 16 + .../providers/openlineage/extractors/manager.py| 64 +++- airflow/providers/openlineage/plugins/listener.py | 3 + docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg| 346 +++-- docs/apache-airflow/migrations-ref.rst | 4 +- 17 files changed, 532 insertions(+), 208 deletions(-) diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py index bfdd0d2423..04307f1ae8 100644 --- a/airflow/api_connexion/schemas/dataset_schema.py +++ b/airflow/api_connexion/schemas/dataset_schema.py @@ -73,6 +73,7 @@ class DatasetSchema(SQLAlchemySchema): updated_at = auto_field() producing_tasks = fields.List(fields.Nested(TaskOutletDatasetReferenceSchema)) consuming_dags = fields.List(fields.Nested(DagScheduleDatasetReferenceSchema)) +automatic = auto_field() class DatasetCollection(NamedTuple): diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 0dc635a00b..a5bd88adb7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -28,8 +28,9 @@ class Dataset: uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) extra: dict[str, Any] | None = None +automatic: bool = False -__version__: ClassVar[int] = 1 +__version__: ClassVar[int] = 2 @uri.validator def _check_uri(self, attr, uri: str): diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 08871c9f65..1742ef9bf6 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -52,7 +52,11 @@ class DatasetManager(LoggingMixin): session.flush() for dataset_model in dataset_models: -self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, extra=dataset_model.extra)) +self.notify_dataset_created( +dataset=Dataset( +uri=dataset_model.uri, extra=dataset_model.extra, automatic=dataset_model.automatic +) +) def register_dataset_change( self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index 6ec0a8938e..f490cfb202 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -27,6 +27,7 @@ from airflow.typing_compat import Protocol from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: +from airflow.datasets import Dataset from airflow.models.connection import Connection # Avoid circular imports. log = logging.getLogger(__name__) @@ -106,6 +107,22 @@ class BaseHook(LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: return {} +def add_input_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_input(dataset, self) + +def add_output_dataset(self, dataset: Dataset): +from airflow.lineage.hook import get_hook_lineage_collector + +get_hook_lineage_collector().add_output(dataset, self) + +def ol_to_airflow_dataset(self, ol_dataset): +raise NotImplementedError() + +def airflow_to_ol_dataset(self, dataset): +raise NotImplementedError() + class DiscoverableHook(Protocol): """ diff --git a/airflow/lineage/hook.py b/airflow/lineage/hook.py new file mode 100644 index 00..e85187bff0 --- /dev/null +++ b/airflow/lineage/hook.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding c
(airflow) branch hooks-lineage updated (351f7fe227 -> 6ae8fb11d9)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a change to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git discard 351f7fe227 datasets from hooks discard 64e522eead hooks poc new 6ae8fb11d9 hooks poc This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (351f7fe227) \ N -- N -- N refs/heads/hooks-lineage (6ae8fb11d9) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes:
(airflow) 01/02: hooks poc
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch hooks-lineage in repository https://gitbox.apache.org/repos/asf/airflow.git commit 64e522eead3dcf6a3b35e4bed96363080cd01a7a Author: Maciej Obuchowski AuthorDate: Fri Nov 3 16:49:02 2023 +0100 hooks poc Signed-off-by: Maciej Obuchowski --- airflow/hooks/base.py | 13 + airflow/lineage/hook.py| 60 + airflow/operators/python.py| 63 ++ airflow/providers/amazon/aws/hooks/s3.py | 15 ++ airflow/providers/google/cloud/hooks/gcs.py| 11 .../providers/google/cloud/transfers/s3_to_gcs.py | 16 ++ .../providers/openlineage/extractors/manager.py| 8 +++ 7 files changed, 186 insertions(+) diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py index 6ec0a8938e..a989c327c4 100644 --- a/airflow/hooks/base.py +++ b/airflow/hooks/base.py @@ -23,6 +23,7 @@ import warnings from typing import TYPE_CHECKING, Any from airflow.exceptions import RemovedInAirflow3Warning +from airflow.lineage.hook import get_hook_lineage_collector from airflow.typing_compat import Protocol from airflow.utils.log.logging_mixin import LoggingMixin @@ -106,6 +107,18 @@ class BaseHook(LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: return {} +@classmethod +def add_input_dataset(cls, name: str, namespace: str): +from openlineage.client.run import Dataset as OpenLineageDataset + + get_hook_lineage_collector().add_input(OpenLineageDataset(namespace=namespace, name=name)) + +@classmethod +def add_output_dataset(cls, name: str, namespace: str): +from openlineage.client.run import Dataset as OpenLineageDataset + + get_hook_lineage_collector().add_output(OpenLineageDataset(namespace=namespace, name=name)) + class DiscoverableHook(Protocol): """ diff --git a/airflow/lineage/hook.py b/airflow/lineage/hook.py new file mode 100644 index 00..6dee97f030 --- /dev/null +++ b/airflow/lineage/hook.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import typing + +from airflow.compat.functools import cache +from airflow.utils.module_loading import import_string + +if typing.TYPE_CHECKING: +from openlineage.client.run import Dataset as OpenLineageDataset + + +class LineageCollector: +"""Info.""" + +def __init__(self): +self.inputs: list[OpenLineageDataset] = [] +self.outputs: list[OpenLineageDataset] = [] + +def add_input(self, input: OpenLineageDataset): +self.inputs.append(input) + +def add_output(self, output: OpenLineageDataset): +self.outputs.append(output) + +@property +def collected(self) -> tuple[list[OpenLineageDataset], list[OpenLineageDataset]]: +return self.inputs, self.outputs + +def has_collected(self) -> bool: +return len(self.inputs) != 0 and len(self.outputs) != 0 + + +_collector = LineageCollector() + + +@cache +def does_openlineage_exist() -> bool: +is_disabled = import_string("apache.airflow.providers.openlineage.plugin._is_disabled") +return is_disabled and is_disabled() + + +def get_hook_lineage_collector() -> LineageCollector: +return _collector diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 666bc9161a..0bd184084c 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -45,6 +45,7 @@ from airflow.exceptions import ( DeserializingResultError, RemovedInAirflow3Warning, ) +from airflow.lineage.hook import get_hook_lineage_collector from airflow.models.baseoperator import BaseOperator from airflow.models.skipmixin import SkipMixin from airflow.models.taskinstance import _CURRENT_CONTEXT @@ -304,6 +305,68 @@ class ShortCircuitOperator(PythonOperator, SkipMixin): # returns the result of the super execute method as it is instead of returning None return condition +def get_openlinea
(airflow) branch main updated: Remove ClassVar annotations. (#36084)
This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new fba682b1a5 Remove ClassVar annotations. (#36084) fba682b1a5 is described below commit fba682b1a54a7936e955be1dbfae8e0e6f7a9443 Author: Jakub Dardzinski AuthorDate: Wed Dec 6 15:16:27 2023 +0100 Remove ClassVar annotations. (#36084) Signed-off-by: Jakub Dardzinski --- airflow/providers/openlineage/plugins/facets.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/openlineage/plugins/facets.py b/airflow/providers/openlineage/plugins/facets.py index b50db6c267..4037b9026a 100644 --- a/airflow/providers/openlineage/plugins/facets.py +++ b/airflow/providers/openlineage/plugins/facets.py @@ -28,7 +28,7 @@ class AirflowMappedTaskRunFacet(BaseFacet): mapIndex: int operatorClass: str -_additional_skip_redact: list[str] = ["operatorClass"] +_additional_skip_redact = ["operatorClass"] @classmethod def from_task_instance(cls, task_instance): @@ -63,7 +63,7 @@ class UnknownOperatorInstance(RedactMixin): properties: dict[str, object] type: str = "operator" -_skip_redact: list[str] = ["name", "type"] +_skip_redact = ["name", "type"] @define(slots=False)