(airflow) branch main updated: doc: add datadog connection json structure. (#40198)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 86190aaa4a doc: add datadog connection json structure. (#40198) 86190aaa4a is described below commit 86190aaa4a6601e86ba067ad4325e9fea7923511 Author: Gabe Schenz AuthorDate: Mon Jun 17 17:51:00 2024 -0500 doc: add datadog connection json structure. (#40198) --- .../connections/datadog.rst| 22 ++ 1 file changed, 22 insertions(+) diff --git a/docs/apache-airflow-providers-datadog/connections/datadog.rst b/docs/apache-airflow-providers-datadog/connections/datadog.rst index dccc1caf68..b56ad1f82b 100644 --- a/docs/apache-airflow-providers-datadog/connections/datadog.rst +++ b/docs/apache-airflow-providers-datadog/connections/datadog.rst @@ -44,3 +44,25 @@ Extra ``app_key``: Datadog `application key <https://docs.datadoghq.com/account_management/api-app-keys/#application-keys>`__ ``source_type_name``: Datadog `source type name <https://docs.datadoghq.com/integrations/faq/list-of-api-source-attribute-value/>`__ (defaults to my_apps). + + +Secret Management +- + +When storing the connection details in a secret management system, it can be convenient to name the secret with the default value:: + + secret name: airflow/connections/datadog_default + +The following json is an example of what the secret contents should look like:: + + { +"conn_type": "datadog", +"description": "Datadog connection for my app", +"extra": { + "api_host": "https://api.datadoghq.com;, + "api_key": "my api key", + "app_key": "my app key", + "source_type_name": "apache" +}, +"host": "environment-region-application.domain.com" + }
(airflow) branch main updated (a31b10edda -> 42a2b1a3bb)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from a31b10edda fix typo in example_params_ui_tutorial (#40094) add 42a2b1a3bb Fix aws assume role session creation when deferrable (#40051) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/hooks/base_aws.py| 11 --- tests/providers/amazon/aws/hooks/test_base_aws.py | 7 ++- 2 files changed, 14 insertions(+), 4 deletions(-)
(airflow) branch main updated: Doc-only: mention minimum boto3 1.34.52 for AWS provider when using Batch `ecs_properties_override` (#39983)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 99dd24b436 Doc-only: mention minimum boto3 1.34.52 for AWS provider when using Batch `ecs_properties_override` (#39983) 99dd24b436 is described below commit 99dd24b436c1a8c5f736e9784c60d469a5b2bed7 Author: Josh Dimarsky <24758845+yehoshuadimar...@users.noreply.github.com> AuthorDate: Tue Jun 4 16:50:28 2024 -0400 Doc-only: mention minimum boto3 1.34.52 for AWS provider when using Batch `ecs_properties_override` (#39983) --- airflow/providers/amazon/aws/operators/batch.py | 20 +++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py index 849fc19346..6125c37140 100644 --- a/airflow/providers/amazon/aws/operators/batch.py +++ b/airflow/providers/amazon/aws/operators/batch.py @@ -30,6 +30,8 @@ from datetime import timedelta from functools import cached_property from typing import TYPE_CHECKING, Any, Sequence +import botocore + from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import BaseOperator @@ -45,7 +47,11 @@ from airflow.providers.amazon.aws.triggers.batch import ( BatchCreateComputeEnvironmentTrigger, BatchJobTrigger, ) -from airflow.providers.amazon.aws.utils import trim_none_values, validate_execute_complete_event +from airflow.providers.amazon.aws.utils import ( +get_botocore_version, +trim_none_values, +validate_execute_complete_event, +) from airflow.providers.amazon.aws.utils.task_log_fetcher import AwsTaskLogFetcher from airflow.utils.types import NOTSET @@ -66,6 +72,7 @@ class BatchOperator(BaseOperator): :param overrides: DEPRECATED, use container_overrides instead with the same value. :param container_overrides: the `containerOverrides` parameter for boto3 (templated) :param ecs_properties_override: the `ecsPropertiesOverride` parameter for boto3 (templated) +**NOTE** This requires `boto3` version 1.34.52+ :param node_overrides: the `nodeOverrides` parameter for boto3 (templated) :param share_identifier: The share identifier for the job. Don't specify this parameter if the job queue doesn't have a scheduling policy. @@ -323,6 +330,17 @@ class BatchOperator(BaseOperator): try: response = self.hook.client.submit_job(**trim_none_values(args)) +except botocore.exceptions.ParamValidationError as error: +if ( +'Unknown parameter in input: "ecsPropertiesOverride"' in str(error) +) and self.ecs_properties_override: +self.log.error( +"You are attempting to use ecsPropertiesOverride and the botocore API returned an " +"error message which may indicate the need to update botocore to do this. \n" +"Support for using ecsPropertiesOverride was added in botocore 1.34.52 and you are using botocore %s", +".".join(map(str, get_botocore_version())), +) +raise except Exception as e: self.log.error( "AWS Batch job failed submission - job definition: %s - on queue %s",
(airflow) branch main updated: Bedrock system test adjustment (#40032)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 8a32b940ce Bedrock system test adjustment (#40032) 8a32b940ce is described below commit 8a32b940ce1151ea0b973b0130428a4834b519c1 Author: D. Ferruzzi AuthorDate: Mon Jun 3 14:53:05 2024 -0700 Bedrock system test adjustment (#40032) --- .../providers/amazon/aws/example_bedrock_retrieve_and_generate.py| 5 + 1 file changed, 5 insertions(+) diff --git a/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py b/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py index c544cee7f8..fcebc8c40a 100644 --- a/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py +++ b/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py @@ -511,6 +511,11 @@ with DAG( ) # [END howto_operator_bedrock_create_data_source] +# In this demo, delete_data_source and delete_cluster are both trying to delete +# the data from the S3 bucket and occasionally hitting a conflict. This ensures that +# delete_data_source doesn't attempt to delete the files, leaving that duty to delete_bucket. +create_data_source.create_data_source_kwargs["dataDeletionPolicy"] = "RETAIN" + # [START howto_operator_bedrock_ingest_data] ingest_data = BedrockIngestDataOperator( task_id="ingest_data",
(airflow) branch main updated (1da7f1f433 -> ec2e245f0e)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 1da7f1f433 Pin requests due to incompatibility with docker-py (#39740) add ec2e245f0e Fetch served logs also when task attempt is up for retry and no remote logs available (#39496) No new revisions were added by this update. Summary of changes: airflow/utils/log/file_task_handler.py | 8 ++-- tests/utils/test_log_handlers.py | 24 +--- 2 files changed, 23 insertions(+), 9 deletions(-)
(airflow) branch main updated: Small refactor for example_bedrock_knowledge_base.py (#39672)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 4d0c7242bc Small refactor for example_bedrock_knowledge_base.py (#39672) 4d0c7242bc is described below commit 4d0c7242bcc8a403c03edd993b8b445a51720492 Author: D. Ferruzzi AuthorDate: Thu May 16 14:43:48 2024 -0700 Small refactor for example_bedrock_knowledge_base.py (#39672) - Renamed example_bedrock_retrieve_and_generate.py, a more accurate and descriptive name - Added an example for invoking Claude models and added that to the docs --- .../operators/bedrock.rst | 34 ++ .../operators/opensearchserverless.rst | 2 +- ...py => example_bedrock_retrieve_and_generate.py} | 27 - 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/docs/apache-airflow-providers-amazon/operators/bedrock.rst b/docs/apache-airflow-providers-amazon/operators/bedrock.rst index 1808f5138c..daf9301565 100644 --- a/docs/apache-airflow-providers-amazon/operators/bedrock.rst +++ b/docs/apache-airflow-providers-amazon/operators/bedrock.rst @@ -46,7 +46,10 @@ Invoke an existing Amazon Bedrock Model To invoke an existing Amazon Bedrock model, you can use :class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockInvokeModelOperator`. -Note that every model family has different input and output formats. +Note that every model family has different input and output formats. Some examples are included below, but +for details on the different formats, see +`Inference parameters for foundation models <https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html>`__ + For example, to invoke a Meta Llama model you would use: .. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock.py @@ -63,7 +66,14 @@ To invoke an Amazon Titan model you would use: :start-after: [START howto_operator_invoke_titan_model] :end-before: [END howto_operator_invoke_titan_model] -For details on the different formats, see `Inference parameters for foundation models <https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html>`__ +To invoke a Claude V2 model using the Completions API you would use: + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py +:language: python +:dedent: 4 +:start-after: [START howto_operator_invoke_claude_model] +:end-before: [END howto_operator_invoke_claude_model] + .. _howto/operator:BedrockCustomizeModelOperator: @@ -119,7 +129,7 @@ To create an Amazon Bedrock Knowledge Base, you can use For more information on which models support embedding data into a vector store, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html -.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py :language: python :dedent: 4 :start-after: [START howto_operator_bedrock_create_knowledge_base] @@ -132,7 +142,7 @@ Delete an Amazon Bedrock Knowledge Base Deleting a Knowledge Base is a simple boto API call and can be done in a TaskFlow task like the example below. -.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py :language: python :start-after: [START howto_operator_bedrock_delete_knowledge_base] :end-before: [END howto_operator_bedrock_delete_knowledge_base] @@ -145,7 +155,7 @@ Create an Amazon Bedrock Data Source To create an Amazon Bedrock Data Source, you can use :class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateDataSourceOperator`. -.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py :language: python :dedent: 4 :start-after: [START howto_operator_bedrock_create_data_source] @@ -158,7 +168,7 @@ Delete an Amazon Bedrock Data Source Deleting a Data Source is a simple boto API call and can be done in a TaskFlow task like the example below. -.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py :language: python :start-after: [START howto_operator_bedrock_delete_data_source] :end-before: [END howto_operator_bedrock_delete_data_source] @@ -171,7 +181,7 @@ Ingest data into an Amazon Bedrock Data Source To add data from an Amazon S3 bucket
(airflow) branch main updated: Fix default value for aws batch operator retry strategy (#39608)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 9ea78d9d72 Fix default value for aws batch operator retry strategy (#39608) 9ea78d9d72 is described below commit 9ea78d9d726d9ddb6109a7fba0c3a838f8a05610 Author: 0x26res AuthorDate: Wed May 15 22:05:24 2024 +0100 Fix default value for aws batch operator retry strategy (#39608) Co-authored-by: aandres --- airflow/providers/amazon/aws/operators/batch.py| 4 +-- tests/providers/amazon/aws/operators/test_batch.py | 32 -- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py index afca0fc615..00b6287145 100644 --- a/airflow/providers/amazon/aws/operators/batch.py +++ b/airflow/providers/amazon/aws/operators/batch.py @@ -206,9 +206,7 @@ class BatchOperator(BaseOperator): self.scheduling_priority_override = scheduling_priority_override self.array_properties = array_properties self.parameters = parameters or {} -self.retry_strategy = retry_strategy or {} -if not self.retry_strategy.get("attempts", None): -self.retry_strategy["attempts"] = 1 +self.retry_strategy = retry_strategy self.waiters = waiters self.tags = tags or {} self.wait_for_completion = wait_for_completion diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 2ac9557813..f769c1baa8 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -20,6 +20,7 @@ from __future__ import annotations from unittest import mock from unittest.mock import patch +import botocore.client import pytest from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred @@ -64,7 +65,7 @@ class TestBatchOperator: max_retries=self.MAX_RETRIES, status_retries=self.STATUS_RETRIES, parameters=None, -retry_strategy=None, +retry_strategy={"attempts": 1}, container_overrides={}, array_properties=None, aws_conn_id="airflow_test", @@ -112,6 +113,34 @@ class TestBatchOperator: self.get_client_type_mock.assert_called_once_with(region_name="eu-west-1") +def test_init_defaults(self): +"""Test constructor default values""" +batch_job = BatchOperator( +task_id="task", +job_name=JOB_NAME, +job_queue="queue", +job_definition="hello-world", +) +assert batch_job.job_id is None +assert batch_job.job_name == JOB_NAME +assert batch_job.job_queue == "queue" +assert batch_job.job_definition == "hello-world" +assert batch_job.waiters is None +assert batch_job.hook.max_retries == 4200 +assert batch_job.hook.status_retries == 10 +assert batch_job.parameters == {} +assert batch_job.retry_strategy is None +assert batch_job.container_overrides is None +assert batch_job.array_properties is None +assert batch_job.node_overrides is None +assert batch_job.share_identifier is None +assert batch_job.scheduling_priority_override is None +assert batch_job.hook.region_name is None +assert batch_job.hook.aws_conn_id is None +assert issubclass(type(batch_job.hook.client), botocore.client.BaseClient) +assert batch_job.tags == {} +assert batch_job.wait_for_completion is True + def test_template_fields_overrides(self): assert self.batch.template_fields == ( "job_id", @@ -238,7 +267,6 @@ class TestBatchOperator: "jobName": JOB_NAME, "jobDefinition": "hello-world", "parameters": {}, -"retryStrategy": {"attempts": 1}, "tags": {}, } if override == "overrides":
(airflow) branch main updated: Add MySQL LTS 8.4 (#39488)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 7e99ead587 Add MySQL LTS 8.4 (#39488) 7e99ead587 is described below commit 7e99ead587d1860fedd0adcfbd779f39f41b2ae2 Author: Andrey Anshin AuthorDate: Thu May 9 23:14:20 2024 +0400 Add MySQL LTS 8.4 (#39488) --- README.md | 2 +- dev/breeze/doc/images/output-commands.svg | 2 +- dev/breeze/doc/images/output_setup_config.svg | 2 +- dev/breeze/doc/images/output_setup_config.txt | 2 +- dev/breeze/doc/images/output_shell.svg | 2 +- dev/breeze/doc/images/output_shell.txt | 2 +- dev/breeze/doc/images/output_start-airflow.svg | 2 +- dev/breeze/doc/images/output_start-airflow.txt | 2 +- dev/breeze/doc/images/output_testing_db-tests.svg | 2 +- dev/breeze/doc/images/output_testing_db-tests.txt | 2 +- dev/breeze/doc/images/output_testing_integration-tests.svg | 2 +- dev/breeze/doc/images/output_testing_integration-tests.txt | 2 +- dev/breeze/doc/images/output_testing_tests.svg | 2 +- dev/breeze/doc/images/output_testing_tests.txt | 2 +- dev/breeze/src/airflow_breeze/global_constants.py | 4 ++-- dev/breeze/tests/test_selective_checks.py | 4 ++-- generated/PYPI_README.md | 2 +- 17 files changed, 19 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 5bf6034a92..916c682a1b 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Apache Airflow is tested with: | Platform| AMD64/ARM64(\*)| AMD64/ARM64(\*) | | Kubernetes | 1.26, 1.27, 1.28, 1.29 | 1.26, 1.27, 1.28, 1.29 | | PostgreSQL | 12, 13, 14, 15, 16 | 12, 13, 14, 15, 16 | -| MySQL | 8.0, Innovation| 8.0, Innovation | +| MySQL | 8.0, 8.4, Innovation | 8.0, Innovation | | SQLite | 3.15.0+| 3.15.0+ | \* Experimental diff --git a/dev/breeze/doc/images/output-commands.svg b/dev/breeze/doc/images/output-commands.svg index 0a08c99dbc..8fd24ada23 100644 --- a/dev/breeze/doc/images/output-commands.svg +++ b/dev/breeze/doc/images/output-commands.svg @@ -317,7 +317,7 @@ │(sqlite|mysql|postgres|none) [...] │[default:sqlite]& [...] │--postgres-version│--mysql-version│--mysql-version│--db-reset╰──╯ ╭─BuildCIimage(beforeenteringshell)< [...] diff --git a/dev/breeze/doc/images/output_setup_config.svg b/dev/breeze/doc/images/output_setup_config.svg index 862cc49edf..5ac7126f54 100644 --- a/dev/breeze/doc/images/output_setup_config.svg +++ b/dev/breeze/doc/images/output_setup_config.svg @@ -138,7 +138,7 @@ │(sqlite|mysql|postgres|none) [...] │[default:sqlite] [...] │--postgres│--mysql│--mysql│--cheatsheet│--asciiart│--colour│(sqlite|mysql|postgres|none)& [...] │[default:sqlite]& [...] │--postgres-version│--mysql-version│--mysql-version│--db-reset╰──╯ ╭─Chooseexecutor───│(sqlite|mysql|postgres|none) [...] │[default:sqlite]& [...] │--postgres│--mysql│--mysql│--db╰──╯ ╭─Choosingexecutor [...] diff --git a/dev/breeze/doc/images/output_start-airflow.txt b/dev/breeze/doc/images/output_start-airflow.txt index b614227769..67f27aab3d 100644 --- a/dev/breeze/doc/images/output_start-airflow.txt +++ b/dev/breeze/doc/images/output_start-airflow.txt @@ -1 +1 @@ -4b9738d9c632eb84243c5f8217003a32 +605c2807c19369ea7967ea0467caa497 diff --git a/dev/breeze/doc/images/output_testing_db-tests.svg b/dev/breeze/doc/images/output_testing_db-tests.svg index 77c6c53294..edc7922b18 100644 --- a/dev/breeze/doc/images/output_testing_db-tests.svg +++ b/dev/breeze/doc/images/output_testing_db-tests.svg @@ -333,7 +333,7 @@ │(3.8|3.9|3.10|3.11|3.12)&#
(airflow) branch main updated (ee584f4eb1 -> 598398a816)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from ee584f4eb1 Add the existing_nullable to the downgrade side of the migration (#39374) add 598398a816 Amazon Bedrock - Knowledge Bases and Data Sources (#39245) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/hooks/bedrock.py | 20 + ...ied_permissions.py => opensearch_serverless.py} | 19 +- airflow/providers/amazon/aws/operators/bedrock.py | 315 +++- airflow/providers/amazon/aws/sensors/bedrock.py| 173 ++- .../amazon/aws/sensors/opensearch_serverless.py| 129 + airflow/providers/amazon/aws/triggers/bedrock.py | 89 +++- ...lambda_function.py => opensearch_serverless.py} | 54 +-- .../amazon/aws/waiters/bedrock-agent.json | 73 +++ .../amazon/aws/waiters/opensearchserverless.json | 36 ++ airflow/providers/amazon/provider.yaml | 15 + .../operators/bedrock.rst | 97 .../operators/opensearchserverless.rst | 59 +++ .../aws/amazon-opensearch_light...@4x.png | Bin 0 -> 12124 bytes tests/providers/amazon/aws/hooks/test_bedrock.py | 3 +- .../{test_sts.py => test_opensearch_serverless.py} | 12 +- .../providers/amazon/aws/operators/test_bedrock.py | 131 - tests/providers/amazon/aws/sensors/test_bedrock.py | 164 ++- .../aws/sensors/test_opensearch_serverless.py | 113 + .../providers/amazon/aws/triggers/test_bedrock.py | 93 +++- .../aws/triggers/test_opensearch_serverless.py | 88 tests/providers/amazon/aws/utils/test_waiter.py| 7 +- .../amazon/aws/waiters/test_bedrock_agent.py | 111 + .../aws/waiters/test_opensearch_serverless.py | 71 +++ .../amazon/aws/example_bedrock_knowledge_base.py | 527 + 24 files changed, 2312 insertions(+), 87 deletions(-) copy airflow/providers/amazon/aws/hooks/{verified_permissions.py => opensearch_serverless.py} (67%) create mode 100644 airflow/providers/amazon/aws/sensors/opensearch_serverless.py copy airflow/providers/amazon/aws/triggers/{lambda_function.py => opensearch_serverless.py} (50%) create mode 100644 airflow/providers/amazon/aws/waiters/bedrock-agent.json create mode 100644 airflow/providers/amazon/aws/waiters/opensearchserverless.json create mode 100644 docs/apache-airflow-providers-amazon/operators/opensearchserverless.rst create mode 100644 docs/integration-logos/aws/amazon-opensearch_light...@4x.png copy tests/providers/amazon/aws/hooks/{test_sts.py => test_opensearch_serverless.py} (71%) create mode 100644 tests/providers/amazon/aws/sensors/test_opensearch_serverless.py create mode 100644 tests/providers/amazon/aws/triggers/test_opensearch_serverless.py create mode 100644 tests/providers/amazon/aws/waiters/test_bedrock_agent.py create mode 100644 tests/providers/amazon/aws/waiters/test_opensearch_serverless.py create mode 100644 tests/system/providers/amazon/aws/example_bedrock_knowledge_base.py
(airflow) branch main updated (f6fb4ccf2d -> e3e6aa9b3e)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from f6fb4ccf2d docs: fix environment variable names when section name has dot in it (#39312) add e3e6aa9b3e Add support for role arn for aws creds (#38911) No new revisions were added by this update. Summary of changes: .../cloud/hooks/cloud_storage_transfer_service.py | 1 + .../operators/cloud_storage_transfer_service.py| 36 +++ .../test_cloud_storage_transfer_service.py | 52 +- 3 files changed, 79 insertions(+), 10 deletions(-)
(airflow) branch main updated (f3ab31de97 -> c25d346adf)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from f3ab31de97 Use `model_dump` instead of `dict` for serialize Pydantic V2 model (#38933) add c25d346adf Amazon Bedrock - Model Throughput Provisioning (#38850) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/operators/bedrock.py | 103 +- airflow/providers/amazon/aws/sensors/bedrock.py| 149 + airflow/providers/amazon/aws/triggers/bedrock.py | 36 + airflow/providers/amazon/aws/waiters/bedrock.json | 31 + .../operators/bedrock.rst | 37 + tests/always/test_project_structure.py | 1 + .../providers/amazon/aws/operators/test_bedrock.py | 47 +++ tests/providers/amazon/aws/sensors/test_bedrock.py | 79 +-- .../providers/amazon/aws/triggers/test_bedrock.py | 36 - tests/providers/amazon/aws/waiters/test_bedrock.py | 41 +- .../system/providers/amazon/aws/example_bedrock.py | 32 - 11 files changed, 553 insertions(+), 39 deletions(-)
(airflow) branch main updated: Amazon Bedrock - Fix system test (#38887)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new b6ff085679 Amazon Bedrock - Fix system test (#38887) b6ff085679 is described below commit b6ff085679c283cd3ccc3edf20dd3e6b0eaec967 Author: D. Ferruzzi AuthorDate: Wed Apr 10 10:40:34 2024 -0700 Amazon Bedrock - Fix system test (#38887) --- .../system/providers/amazon/aws/example_bedrock.py | 92 +++--- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_bedrock.py b/tests/system/providers/amazon/aws/example_bedrock.py index 12e2461547..e25bbb8ed7 100644 --- a/tests/system/providers/amazon/aws/example_bedrock.py +++ b/tests/system/providers/amazon/aws/example_bedrock.py @@ -18,12 +18,12 @@ from __future__ import annotations import json from datetime import datetime +from os import environ -from botocore.exceptions import ClientError - -from airflow.decorators import task +from airflow.decorators import task, task_group from airflow.models.baseoperator import chain from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.hooks.bedrock import BedrockHook from airflow.providers.amazon.aws.operators.bedrock import ( BedrockCustomizeModelOperator, @@ -35,6 +35,7 @@ from airflow.providers.amazon.aws.operators.s3 import ( S3DeleteBucketOperator, ) from airflow.providers.amazon.aws.sensors.bedrock import BedrockCustomizeModelCompletedSensor +from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder @@ -44,10 +45,10 @@ sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).bu DAG_ID = "example_bedrock" -# Creating a custom model takes nearly two hours. If SKIP_LONG_TASKS is True then set -# the trigger rule to an improbable state. This way we can still have the code snippets -# for docs, and we can manually run the full tests occasionally. -SKIP_LONG_TASKS = True +# Creating a custom model takes nearly two hours. If SKIP_LONG_TASKS +# is True then these tasks will be skipped. This way we can still have +# the code snippets for docs, and we can manually run the full tests. +SKIP_LONG_TASKS = environ.get("SKIP_LONG_SYSTEM_TEST_TASKS", default=True) LLAMA_MODEL_ID = "meta.llama2-13b-chat-v1" PROMPT = "What color is an orange?" @@ -61,15 +62,41 @@ HYPERPARAMETERS = { } -@task -def delete_custom_model(model_name: str): -try: -BedrockHook().conn.delete_custom_model(modelIdentifier=model_name) -except ClientError as e: -if SKIP_LONG_TASKS and (e.response["Error"]["Code"] == "ValidationException"): -# There is no model to delete. Since we skipped making one, that's fine. -return -raise e +@task_group +def customize_model_workflow(): +# [START howto_operator_customize_model] +customize_model = BedrockCustomizeModelOperator( +task_id="customize_model", +job_name=custom_model_job_name, +custom_model_name=custom_model_name, +role_arn=test_context[ROLE_ARN_KEY], + base_model_id=f"arn:aws:bedrock:us-east-1::foundation-model/{TITAN_MODEL_ID}", +hyperparameters=HYPERPARAMETERS, +training_data_uri=training_data_uri, +output_data_uri=f"s3://{bucket_name}/myOutputData", +) +# [END howto_operator_customize_model] + +# [START howto_sensor_customize_model] +await_custom_model_job = BedrockCustomizeModelCompletedSensor( +task_id="await_custom_model_job", +job_name=custom_model_job_name, +) +# [END howto_sensor_customize_model] + +@task +def delete_custom_model(): + BedrockHook().conn.delete_custom_model(modelIdentifier=custom_model_name) + +@task.branch +def run_or_skip(): +return end_workflow.task_id if SKIP_LONG_TASKS else customize_model.task_id + +run_or_skip = run_or_skip() +end_workflow = EmptyOperator(task_id="end_workflow", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + +chain(run_or_skip, Label("Long-running tasks skipped"), end_workflow) +chain(run_or_skip, customize_model, await_custom_model_job, delete_custom_model(), end_workflow) with DAG( @@ -95,7 +122,7 @@ with DAG( upload_training_data = S3CreateObjectOperator( task_id="upload_data", s3_bucket=bucket_name, -s3_key=training_data_uri, +s3_key=input_data_s3_key, data=json.dumps(TRAIN_DATA), ) @@ -115,30 +142,6 @@ with DAG( ) # [END howto_operator_invoke_titan_mo
(airflow) branch main updated (6027aa58f5 -> 6abd2c78c0)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 6027aa58f5 Remove a couple items from the 2.9.0 changelog (#38866) add 6abd2c78c0 Amazon Bedrock - Clean up hook unit tests (#38849) No new revisions were added by this update. Summary of changes: tests/providers/amazon/aws/hooks/test_bedrock.py | 47 +--- 1 file changed, 10 insertions(+), 37 deletions(-)
(airflow) branch main updated: Amazon Bedrock - Model Customization Jobs (#38693)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 7ed31d5fdf Amazon Bedrock - Model Customization Jobs (#38693) 7ed31d5fdf is described below commit 7ed31d5fdf510e00528522ea313a20b19e498522 Author: D. Ferruzzi AuthorDate: Mon Apr 8 13:22:16 2024 -0700 Amazon Bedrock - Model Customization Jobs (#38693) * Amazon Bedrock - Customize Model Operator/Sensor/Waiter/Trigger --- airflow/providers/amazon/aws/hooks/bedrock.py | 20 +++ airflow/providers/amazon/aws/operators/bedrock.py | 161 - airflow/providers/amazon/aws/sensors/bedrock.py| 110 ++ airflow/providers/amazon/aws/triggers/bedrock.py | 61 airflow/providers/amazon/aws/waiters/bedrock.json | 42 ++ airflow/providers/amazon/provider.yaml | 6 + .../operators/bedrock.rst | 38 + tests/providers/amazon/aws/hooks/test_bedrock.py | 36 - .../providers/amazon/aws/operators/test_bedrock.py | 161 ++--- tests/providers/amazon/aws/sensors/test_bedrock.py | 95 .../providers/amazon/aws/triggers/test_bedrock.py | 53 +++ tests/providers/amazon/aws/waiters/test_bedrock.py | 70 + .../system/providers/amazon/aws/example_bedrock.py | 106 +- 13 files changed, 929 insertions(+), 30 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/bedrock.py b/airflow/providers/amazon/aws/hooks/bedrock.py index 11bacd9414..96636eb952 100644 --- a/airflow/providers/amazon/aws/hooks/bedrock.py +++ b/airflow/providers/amazon/aws/hooks/bedrock.py @@ -19,6 +19,26 @@ from __future__ import annotations from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +class BedrockHook(AwsBaseHook): +""" +Interact with Amazon Bedrock. + +Provide thin wrapper around :external+boto3:py:class:`boto3.client("bedrock") `. + +Additional arguments (such as ``aws_conn_id``) may be specified and +are passed down to the underlying AwsBaseHook. + +.. seealso:: +- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` +""" + +client_type = "bedrock" + +def __init__(self, *args, **kwargs) -> None: +kwargs["client_type"] = self.client_type +super().__init__(*args, **kwargs) + + class BedrockRuntimeHook(AwsBaseHook): """ Interact with the Amazon Bedrock Runtime. diff --git a/airflow/providers/amazon/aws/operators/bedrock.py b/airflow/providers/amazon/aws/operators/bedrock.py index d8eaf9e5d3..ee34a9aef7 100644 --- a/airflow/providers/amazon/aws/operators/bedrock.py +++ b/airflow/providers/amazon/aws/operators/bedrock.py @@ -19,10 +19,17 @@ from __future__ import annotations import json from typing import TYPE_CHECKING, Any, Sequence -from airflow.providers.amazon.aws.hooks.bedrock import BedrockRuntimeHook +from botocore.exceptions import ClientError + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.bedrock import BedrockHook, BedrockRuntimeHook from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.bedrock import BedrockCustomizeModelCompletedTrigger +from airflow.providers.amazon.aws.utils import validate_execute_complete_event from airflow.providers.amazon.aws.utils.mixins import aws_template_fields from airflow.utils.helpers import prune_dict +from airflow.utils.timezone import utcnow if TYPE_CHECKING: from airflow.utils.context import Context @@ -91,3 +98,155 @@ class BedrockInvokeModelOperator(AwsBaseOperator[BedrockRuntimeHook]): self.log.info("Bedrock %s prompt: %s", self.model_id, self.input_data) self.log.info("Bedrock model response: %s", response_body) return response_body + + +class BedrockCustomizeModelOperator(AwsBaseOperator[BedrockHook]): +""" +Create a fine-tuning job to customize a base model. + +.. seealso:: +For more information on how to use this operator, take a look at the guide: +:ref:`howto/operator:BedrockCustomizeModelOperator` + +:param job_name: A unique name for the fine-tuning job. +:param custom_model_name: A name for the custom model being created. +:param role_arn: The Amazon Resource Name (ARN) of an IAM role that Amazon Bedrock can assume +to perform tasks on your behalf. +:param base_model_id: Name of the base model. +:param training_data_uri: The S3 URI where the training data is stored. +:param output_data_uri: The S3 URI where the output data is stored. +:param hyperparameters: Parameters related to tuning the model. +
(airflow) branch main updated: Typo fix (#38783)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 1f03b9c86c Typo fix (#38783) 1f03b9c86c is described below commit 1f03b9c86c7dd5937c7d32976d8653979e2f7e41 Author: D. Ferruzzi AuthorDate: Fri Apr 5 13:16:20 2024 -0700 Typo fix (#38783) --- airflow/providers/amazon/aws/sensors/sqs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index 7b93e3847c..657c6d9599 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -73,7 +73,7 @@ class SqsSensor(AwsBaseSensor[SqsHook]): :param delete_message_on_reception: Default to `True`, the messages are deleted from the queue as soon as being consumed. Otherwise, the messages remain in the queue after consumption and should be deleted manually. -:param deferrable: If True, the sensor will operate in deferrable more. This mode requires aiobotocore +:param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore module to be installed. (default: False, but can be overridden in config file by setting default_deferrable to True) :param aws_conn_id: The Airflow connection used for AWS credentials.
(airflow) branch main updated (a19a9cb523 -> e4e73ba093)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from a19a9cb523 removed usage of deprecated function for naming the pod in provider k8s pod.py (#38638) add e4e73ba093 Partially enable `TRY002` rule (#38756) No new revisions were added by this update. Summary of changes: pyproject.toml | 58 +++--- 1 file changed, 51 insertions(+), 7 deletions(-)
(airflow) branch main updated: Revert "Add executor field to the DB and parameter to the operators (#38054)" (#38472)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new cbca35918b Revert "Add executor field to the DB and parameter to the operators (#38054)" (#38472) cbca35918b is described below commit cbca35918ba9f9edd233c38072a92aed37f08c3c Author: D. Ferruzzi AuthorDate: Mon Mar 25 15:49:04 2024 -0700 Revert "Add executor field to the DB and parameter to the operators (#38054)" (#38472) This reverts commit 41d5e2226c10c78ee6f493f8e54637dca2f72e32. Co-authored-by: Jarek Potiuk --- .../0139_2_10_0_add_new_executor_field_to_db.py| 46 -- airflow/models/abstractoperator.py | 1 - airflow/models/baseoperator.py | 13 - airflow/models/mappedoperator.py | 5 - airflow/models/taskinstance.py | 7 - airflow/serialization/pydantic/taskinstance.py | 1 - airflow/serialization/schema.json | 1 - docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg| 811 ++--- docs/apache-airflow/migrations-ref.rst | 4 +- tests/models/test_taskinstance.py | 1 - tests/serialization/test_dag_serialization.py | 1 - tests/www/views/test_views_tasks.py| 7 - 13 files changed, 406 insertions(+), 494 deletions(-) diff --git a/airflow/migrations/versions/0139_2_10_0_add_new_executor_field_to_db.py b/airflow/migrations/versions/0139_2_10_0_add_new_executor_field_to_db.py deleted file mode 100644 index 9e3d615f50..00 --- a/airflow/migrations/versions/0139_2_10_0_add_new_executor_field_to_db.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""add new executor field to db - -Revision ID: 677fdbb7fc54 -Revises: b4078ac230a1 -Create Date: 2024-03-25 15:26:59.186579 - -""" - -import sqlalchemy as sa -from alembic import op - - -# revision identifiers, used by Alembic. -revision = '677fdbb7fc54' -down_revision = 'b4078ac230a1' -branch_labels = None -depends_on = None -airflow_version = '2.10.0' - - -def upgrade(): -"""Apply add executor field to task instance""" -op.add_column('task_instance', sa.Column('executor', sa.String(length=1000), default=None)) - - -def downgrade(): -"""Unapply add executor field to task instance""" -op.drop_column('task_instance', 'executor') diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 74911fc27c..f2d179f01b 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -59,7 +59,6 @@ if TYPE_CHECKING: DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner") DEFAULT_POOL_SLOTS: int = 1 DEFAULT_PRIORITY_WEIGHT: int = 1 -DEFAULT_EXECUTOR: str | None = None DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue") DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = conf.getboolean( "scheduler", "ignore_first_depends_on_past_by_default" diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index c59b66309a..8636dd6c2e 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -63,7 +63,6 @@ from airflow.exceptions import ( ) from airflow.lineage import apply_lineage, prepare_lineage from airflow.models.abstractoperator import ( -DEFAULT_EXECUTOR, DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, DEFAULT_OWNER, DEFAULT_POOL_SLOTS, @@ -209,7 +208,6 @@ _PARTIAL_DEFAULTS: dict[str, Any] = { "wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, "wait_for_downstream": False, "retries": DEFAULT_RETRIES, -"executor": DEFAULT_EXECUTOR, "queue": DEFAULT_QUEUE, "pool_slots": D
(airflow) branch main updated (ff28969ff3 -> 41d5e2226c)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from ff28969ff3 fix: EmrServerlessStartJobOperator not serializing DAGs correctly when partial/expand is used. (#38022) add 41d5e2226c Add executor field to the DB and parameter to the operators (#38054) No new revisions were added by this update. Summary of changes: ...=> 0139_2_10_0_add_new_executor_field_to_db.py} | 23 +- airflow/models/abstractoperator.py | 1 + airflow/models/baseoperator.py | 13 + airflow/models/mappedoperator.py | 5 + airflow/models/taskinstance.py | 7 + airflow/serialization/pydantic/taskinstance.py | 1 + airflow/serialization/schema.json | 1 + docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg| 811 +++-- docs/apache-airflow/migrations-ref.rst | 4 +- tests/models/test_taskinstance.py | 1 + tests/serialization/test_dag_serialization.py | 1 + tests/www/views/test_views_tasks.py| 7 + 13 files changed, 461 insertions(+), 416 deletions(-) copy airflow/migrations/versions/{0010_1_6_2_add_password_column_to_user.py => 0139_2_10_0_add_new_executor_field_to_db.py} (67%)
(airflow) branch main updated: fix: EmrServerlessStartJobOperator not serializing DAGs correctly when partial/expand is used. (#38022)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new ff28969ff3 fix: EmrServerlessStartJobOperator not serializing DAGs correctly when partial/expand is used. (#38022) ff28969ff3 is described below commit ff28969ff3370034ed9246d4ce9d0022129b3152 Author: jliu0812 <114856647+jliu0...@users.noreply.github.com> AuthorDate: Mon Mar 25 16:47:53 2024 -0500 fix: EmrServerlessStartJobOperator not serializing DAGs correctly when partial/expand is used. (#38022) --- airflow/providers/amazon/aws/operators/emr.py | 62 +++--- .../amazon/aws/operators/test_emr_serverless.py| 55 +++ 2 files changed, 111 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 7c4d86c5e8..01e1567eab 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -1253,27 +1253,77 @@ class EmrServerlessStartJobOperator(BaseOperator): op_extra_links = [] if isinstance(self, MappedOperator): +operator_class = self.operator_class enable_application_ui_links = self.partial_kwargs.get( "enable_application_ui_links" ) or self.expand_input.value.get("enable_application_ui_links") -job_driver = self.partial_kwargs.get("job_driver") or self.expand_input.value.get("job_driver") +job_driver = self.partial_kwargs.get("job_driver", {}) or self.expand_input.value.get( +"job_driver", {} +) configuration_overrides = self.partial_kwargs.get( "configuration_overrides" ) or self.expand_input.value.get("configuration_overrides") +# Configuration overrides can either be a list or a dictionary, depending on whether it's passed in as partial or expand. +if isinstance(configuration_overrides, list): +if any( +[ +operator_class.is_monitoring_in_job_override( +self=operator_class, +config_key="s3MonitoringConfiguration", +job_override=job_override, +) +for job_override in configuration_overrides +] +): +op_extra_links.extend([EmrServerlessS3LogsLink()]) +if any( +[ +operator_class.is_monitoring_in_job_override( +self=operator_class, +config_key="cloudWatchLoggingConfiguration", +job_override=job_override, +) +for job_override in configuration_overrides +] +): +op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) +else: +if operator_class.is_monitoring_in_job_override( +self=operator_class, +config_key="s3MonitoringConfiguration", +job_override=configuration_overrides, +): +op_extra_links.extend([EmrServerlessS3LogsLink()]) +if operator_class.is_monitoring_in_job_override( +self=operator_class, +config_key="cloudWatchLoggingConfiguration", +job_override=configuration_overrides, +): +op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) + else: +operator_class = self enable_application_ui_links = self.enable_application_ui_links configuration_overrides = self.configuration_overrides job_driver = self.job_driver +if operator_class.is_monitoring_in_job_override( +"s3MonitoringConfiguration", configuration_overrides +): +op_extra_links.extend([EmrServerlessS3LogsLink()]) +if operator_class.is_monitoring_in_job_override( +"cloudWatchLoggingConfiguration", configuration_overrides +): +op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) + if enable_application_ui_links: op_extra_links.extend([EmrServerlessDashboardLink()]) -if "sparkSubmit" in job_driver: +if isinstance(job_driver, list): +if any("sparkSubmit" in ind_job_driver for ind_job_driver in jo
(airflow) branch main updated (35fef2befb -> f2628fda7a)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 35fef2befb Revert "Make current working directory as templated field in BashOperator (#37928)" (#37952) add f2628fda7a ECS Executor - add support to adopt orphaned tasks. (#37786) No new revisions were added by this update. Summary of changes: airflow/executors/base_executor.py | 9 + .../amazon/aws/executors/ecs/ecs_executor.py | 43 +- .../providers/amazon/aws/executors/ecs/utils.py| 12 +++--- .../amazon/aws/executors/ecs/test_ecs_executor.py | 40 4 files changed, 97 insertions(+), 7 deletions(-)
(airflow) branch main updated (30f7b2abe6 -> 0232ad0318)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 30f7b2abe6 Avoid to use too broad `noqa` (#37862) add 0232ad0318 Fix external_executor_id being overwritten (#37784) No new revisions were added by this update. Summary of changes: airflow/models/taskinstance.py| 5 - tests/conftest.py | 2 ++ tests/models/test_taskinstance.py | 47 ++- 3 files changed, 52 insertions(+), 2 deletions(-)
(airflow) branch main updated (eb19f6d4d5 -> 217c4b9404)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from eb19f6d4d5 Add Yektanet to companies using Apache Airflow (#3) add 217c4b9404 Remove file constraints generated by uv (#37780) No new revisions were added by this update. Summary of changes: scripts/in_container/run_generate_constraints.py | 2 ++ 1 file changed, 2 insertions(+)
(airflow) branch main updated: D401 Support in Providers (simple) (#37258)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 08036e5df5 D401 Support in Providers (simple) (#37258) 08036e5df5 is described below commit 08036e5df5ae3ec9f600219361f86a1a3e8e9d19 Author: Andrey Anshin AuthorDate: Fri Feb 9 02:50:09 2024 +0400 D401 Support in Providers (simple) (#37258) --- airflow/providers/apache/hive/hooks/hive.py | 4 ++-- airflow/providers/apache/spark/hooks/spark_connect.py | 4 ++-- airflow/providers/apache/spark/hooks/spark_sql.py | 4 ++-- airflow/providers/apache/spark/hooks/spark_submit.py | 4 ++-- airflow/providers/atlassian/jira/hooks/jira.py| 4 ++-- airflow/providers/discord/hooks/discord_webhook.py| 2 +- airflow/providers/grpc/hooks/grpc.py | 2 +- airflow/providers/jenkins/hooks/jenkins.py| 4 ++-- airflow/providers/microsoft/mssql/hooks/mssql.py | 2 +- airflow/providers/odbc/hooks/odbc.py | 2 +- airflow/providers/opsgenie/hooks/opsgenie.py | 2 +- airflow/providers/redis/hooks/redis.py| 8 airflow/providers/sqlite/hooks/sqlite.py | 2 +- airflow/providers/ssh/hooks/ssh.py| 4 ++-- airflow/providers/tabular/hooks/tabular.py| 2 +- airflow/providers/yandex/hooks/yandex.py | 4 ++-- pyproject.toml| 16 17 files changed, 27 insertions(+), 43 deletions(-) diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index d0dfa10c62..a060556c20 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -119,7 +119,7 @@ class HiveCliHook(BaseHook): @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: -"""Returns connection widgets to add to connection form.""" +"""Return connection widgets to add to Hive Client Wrapper connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import BooleanField, StringField @@ -134,7 +134,7 @@ class HiveCliHook(BaseHook): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: -"""Returns custom field behaviour.""" +"""Return custom UI field behaviour for Hive Client Wrapper connection.""" return { "hidden_fields": ["extra"], "relabeling": {}, diff --git a/airflow/providers/apache/spark/hooks/spark_connect.py b/airflow/providers/apache/spark/hooks/spark_connect.py index 179680387c..acdeaae1f4 100644 --- a/airflow/providers/apache/spark/hooks/spark_connect.py +++ b/airflow/providers/apache/spark/hooks/spark_connect.py @@ -39,7 +39,7 @@ class SparkConnectHook(BaseHook, LoggingMixin): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: -"""Return custom field behaviour.""" +"""Return custom UI field behaviour for Spark Connect connection.""" return { "hidden_fields": [ "schema", @@ -49,7 +49,7 @@ class SparkConnectHook(BaseHook, LoggingMixin): @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: -"""Returns connection widgets to add to connection form.""" +"""Return connection widgets to add to Spark Connect connection form.""" from flask_babel import lazy_gettext from wtforms import BooleanField diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py index 46eec49f30..4d5da04567 100644 --- a/airflow/providers/apache/spark/hooks/spark_sql.py +++ b/airflow/providers/apache/spark/hooks/spark_sql.py @@ -56,7 +56,7 @@ class SparkSqlHook(BaseHook): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: -"""Return custom field behaviour.""" +"""Return custom UI field behaviour for Spark SQL connection.""" return { "hidden_fields": ["schema", "login", "password", "extra"], "relabeling": {}, @@ -64,7 +64,7 @@ class SparkSqlHook(BaseHook): @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: -"""Returns connection widgets to add to connection f
(airflow) branch main updated: ECS Executor - Add backoff on failed task retry (#37109)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 41ebf28103 ECS Executor - Add backoff on failed task retry (#37109) 41ebf28103 is described below commit 41ebf28103007d4894d86783dbcdc3afc16ec2f6 Author: D. Ferruzzi AuthorDate: Mon Feb 5 10:45:50 2024 -0800 ECS Executor - Add backoff on failed task retry (#37109) * ECS Executor - Add backoff on failed task retry --- .../amazon/aws/executors/ecs/ecs_executor.py | 23 +++--- .../providers/amazon/aws/executors/ecs/utils.py| 2 ++ .../executors/utils/exponential_backoff_retry.py | 27 ++ .../amazon/aws/executors/ecs/test_ecs_executor.py | 17 ++ .../utils/test_exponential_backoff_retry.py| 20 +++- 5 files changed, 77 insertions(+), 12 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py index 4c805d9b53..2f0564ed9a 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py @@ -42,7 +42,10 @@ from airflow.providers.amazon.aws.executors.ecs.utils import ( EcsQueuedTask, EcsTaskCollection, ) -from airflow.providers.amazon.aws.executors.utils.exponential_backoff_retry import exponential_backoff_retry +from airflow.providers.amazon.aws.executors.utils.exponential_backoff_retry import ( +calculate_next_attempt_delay, +exponential_backoff_retry, +) from airflow.providers.amazon.aws.hooks.ecs import EcsHook from airflow.utils import timezone from airflow.utils.state import State @@ -300,7 +303,14 @@ class AwsEcsExecutor(BaseExecutor): ) self.active_workers.increment_failure_count(task_key) self.pending_tasks.appendleft( -EcsQueuedTask(task_key, task_cmd, queue, exec_info, failure_count + 1) +EcsQueuedTask( +task_key, +task_cmd, +queue, +exec_info, +failure_count + 1, +timezone.utcnow() + calculate_next_attempt_delay(failure_count), +) ) else: self.log.error( @@ -331,6 +341,8 @@ class AwsEcsExecutor(BaseExecutor): exec_config = ecs_task.executor_config attempt_number = ecs_task.attempt_number _failure_reasons = [] +if timezone.utcnow() < ecs_task.next_attempt_time: +continue try: run_task_response = self._run_task(task_key, cmd, queue, exec_config) except NoCredentialsError: @@ -361,6 +373,9 @@ class AwsEcsExecutor(BaseExecutor): # Make sure the number of attempts does not exceed MAX_RUN_TASK_ATTEMPTS if int(attempt_number) <= int(self.__class__.MAX_RUN_TASK_ATTEMPTS): ecs_task.attempt_number += 1 +ecs_task.next_attempt_time = timezone.utcnow() + calculate_next_attempt_delay( +attempt_number +) self.pending_tasks.appendleft(ecs_task) else: self.log.error( @@ -422,7 +437,9 @@ class AwsEcsExecutor(BaseExecutor): """Save the task to be executed in the next sync by inserting the commands into a queue.""" if executor_config and ("name" in executor_config or "command" in executor_config): raise ValueError('Executor Config should never override "name" or "command"') -self.pending_tasks.append(EcsQueuedTask(key, command, queue, executor_config or {}, 1)) +self.pending_tasks.append( +EcsQueuedTask(key, command, queue, executor_config or {}, 1, timezone.utcnow()) +) def end(self, heartbeat_interval=10): """Waits for all currently running tasks to end, and doesn't launch any tasks.""" diff --git a/airflow/providers/amazon/aws/executors/ecs/utils.py b/airflow/providers/amazon/aws/executors/ecs/utils.py index 7913bdf227..139ef35d71 100644 --- a/airflow/providers/amazon/aws/executors/ecs/utils.py +++ b/airflow/providers/amazon/aws/executors/ecs/utils.py @@ -23,6 +23,7 @@ Data classes and utility functions used by the ECS executor. from __future__ import annotations +import datetime from collections import defaultdict from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Callable, Dict, List @@ -58,6 +59,7 @@ class EcsQueuedTask: queue: str executor_config: ExecutorConfigType attempt_number: int +next_attempt_time: datetime.datetime
(airflow) branch main updated: Add Doctrine in users (#37135)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new ecb7e3a56e Add Doctrine in users (#37135) ecb7e3a56e is described below commit ecb7e3a56e5f96bbbd3c6a7f97c40ba733bcc193 Author: Aurelien Didier AuthorDate: Thu Feb 1 23:53:19 2024 +0100 Add Doctrine in users (#37135) --- INTHEWILD.md | 1 + 1 file changed, 1 insertion(+) diff --git a/INTHEWILD.md b/INTHEWILD.md index 92744e76c7..7ccd3022e0 100644 --- a/INTHEWILD.md +++ b/INTHEWILD.md @@ -172,6 +172,7 @@ Currently, **officially** using Airflow: 1. [Digital First Media](http://www.digitalfirstmedia.com/) [[@duffn](https://github.com/duffn) & [@mschmo](https://github.com/mschmo) & [@seanmuth](https://github.com/seanmuth)] 1. [Disney](https://www.disney.com/) [[@coolbeans201](https://github.com/coolbeans201)] 1. [Docsity](https://www.docsity.com/) +1. [Doctrine](https://www.doctrine.fr/)[[@anteverse](https://github.com/anteverse)] 1. [DoorDash](https://www.doordash.com/) 1. [Dotmodus](http://dotmodus.com) [[@dannylee12](https://github.com/dannylee12)] 1. [Drivy](https://www.drivy.com) [[@AntoineAugusti](https://github.com/AntoineAugusti)]
(airflow) branch main updated: Add a fuzzy/regex pattern-matching for metric allow and block list (#36250)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 481feb32a8 Add a fuzzy/regex pattern-matching for metric allow and block list (#36250) 481feb32a8 is described below commit 481feb32a835277bbd26715c73c2bb85b25b99ad Author: D. Ferruzzi AuthorDate: Fri Jan 12 16:19:39 2024 -0800 Add a fuzzy/regex pattern-matching for metric allow and block list (#36250) * Add a fuzzy match for metric allow and block list * Add deprecation notice and regex pattern unit tests * rename "fuzzy matching" to "pattern matching" * Improve pattern-matching test * clarify docstring phrasing Co-authored-by: Niko Oliveira * line length issue * rephrased docs * hussein changes - Co-authored-by: Niko Oliveira --- airflow/config_templates/config.yml | 24 -- airflow/metrics/otel_logger.py | 11 ++- airflow/metrics/statsd_logger.py| 17 + airflow/metrics/validators.py | 64 +++- tests/core/test_stats.py| 148 +--- 5 files changed, 208 insertions(+), 56 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1788e00593..79932a876a 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -944,24 +944,32 @@ metrics: description: | StatsD (https://github.com/etsy/statsd) integration settings. options: +metrics_use_pattern_match: + description: | +If true, metrics_allow_list and metrics_block_list will use regex pattern matching +anywhere within the metric name instead of only prefix matching at the start of the name. + version_added: 2.9.0 + type: boolean + example: ~ + default: "False" metrics_allow_list: description: | -If you want to avoid emitting all the available metrics, you can configure an -allow list of prefixes (comma separated) to send only the metrics that start -with the elements of the list (e.g: "scheduler,executor,dagrun") +Configure an allow list (comma separated string) to send only certain metrics. +If metrics_use_pattern_match is false, match only the exact metric name prefix. +If metrics_use_pattern_match is true, provide regex patterns to match. version_added: 2.6.0 type: string - example: ~ + example: "\"scheduler,executor,dagrun\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" metrics_block_list: description: | -If you want to avoid emitting all the available metrics, you can configure a -block list of prefixes (comma separated) to filter out metrics that start with -the elements of the list (e.g: "scheduler,executor,dagrun"). +Configure a block list (comma separated string) to block certain metrics from being emitted. If metrics_allow_list and metrics_block_list are both configured, metrics_block_list is ignored. +If metrics_use_pattern_match is false, match only the exact metric name prefix. +If metrics_use_pattern_match is true, provide regex patterns to match. version_added: 2.6.0 type: string - example: ~ + example: "\"scheduler,executor,dagrun\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" statsd_on: description: | diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 2668f1ec97..6e88975f1b 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -35,6 +35,8 @@ from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( OTEL_NAME_MAX_LENGTH, AllowListValidator, +ListValidator, +get_validator, stat_name_otel_handler, ) @@ -166,11 +168,11 @@ class SafeOtelLogger: self, otel_provider, prefix: str = DEFAULT_METRIC_NAME_PREFIX, -allow_list_validator=AllowListValidator(), +metrics_validator: ListValidator = AllowListValidator(), ): self.otel: Callable = otel_provider self.prefix: str = prefix -self.metrics_validator = allow_list_validator +self.metrics_validator = metrics_validator self.meter = otel_provider.get_meter(__name__) self.metrics_map = MetricsMap(self.meter) @@ -393,9 +395,6 @@ def get_otel_logger(cls) -> SafeOtelLogger: interval = conf.getint("metrics", "otel_interval_milliseconds", fallback=None) # ex: 3 debug = conf.getboolean("metrics", "ote
(airflow) branch main updated: metrics tagging documentation (#36627)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 667b842632 metrics tagging documentation (#36627) 667b842632 is described below commit 667b842632fbee984b940a3b8b5a1f0bb3749a0f Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Fri Jan 12 07:14:05 2024 +0530 metrics tagging documentation (#36627) * metrics tagging documentation --- .../logging-monitoring/metrics.rst | 62 +- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index aff3ec3e9a..4c91f37b71 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -147,41 +147,59 @@ Name Descripti ``LocalTaskJob`` ``local_task_job.task_exit`` Number of ``LocalTaskJob`` terminations with a while running a task of a DAG . +``local_task_job.task_exit`` Number of ``LocalTaskJob`` terminations with a + while running a task of a DAG . + Metric with job_id, dag_id, task_id and return_code tagging. ``operator_failures_`` Operator failures +``operator_failures`` Operator failures. Metric with operator_name tagging. ``operator_successes_`` Operator successes -``ti_failures``Overall task instances failures -``ti_successes`` Overall task instances successes -``previously_succeeded`` Number of previously succeeded task instances -``zombies_killed`` Zombie tasks killed +``operator_successes`` Operator successes. Metric with operator_name tagging. +``ti_failures``Overall task instances failures. Metric with dag_id and task_id tagging. +``ti_successes`` Overall task instances successes. Metric with dag_id and task_id tagging. +``previously_succeeded`` Number of previously succeeded task instances. Metric with dag_id and task_id tagging. +``zombies_killed`` Zombie tasks killed. Metric with dag_id and task_id tagging. ``scheduler_heartbeat`` Scheduler heartbeats ``dag_processing.processes`` Relative number of currently running DAG parsing processes (ie this delta - is negative when, since the last metric was sent, processes have completed) -``dag_processing.processor_timeouts`` Number of file processors that have been killed due to taking too long + is negative when, since the last metric was sent, processes have completed). + Metric with file_path and action tagging. +``dag_processing.processor_timeouts`` Number of file processors that have been killed due to taking too long. + Metric with file_path tagging. ``dag_processing.sla_callback_count`` Number of SLA callbacks received ``dag_processing.other_callback_count``Number of non-SLA callbacks received ``dag_processing.file_path_queue_update_count``Number of times we've scanned the filesystem and queued all existing dags ``dag_file_processor_timeouts`` (DEPRECATED) same behavior as ``dag_processing.processor_timeouts`` ``dag_processing.manager_stalls`` Number of stalled ``DagFileProcessorM
(airflow) branch main updated: Adds support for capacity providers to ECS Executor (#36722)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new e7166bb759 Adds support for capacity providers to ECS Executor (#36722) e7166bb759 is described below commit e7166bb7594086ace1c8e34f358e236ab0dcd2b7 Author: D. Ferruzzi AuthorDate: Thu Jan 11 11:55:22 2024 -0800 Adds support for capacity providers to ECS Executor (#36722) Add support for Capacity Providers --- .../aws/executors/ecs/ecs_executor_config.py | 17 ++ .../providers/amazon/aws/executors/ecs/utils.py| 2 +- airflow/providers/amazon/provider.yaml | 19 ++- .../amazon/aws/executors/ecs/test_ecs_executor.py | 65 +- 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py index f0fa97852a..4f57b72d96 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py @@ -40,6 +40,7 @@ from airflow.providers.amazon.aws.executors.ecs.utils import ( camelize_dict_keys, parse_assign_public_ip, ) +from airflow.providers.amazon.aws.hooks.ecs import EcsHook from airflow.utils.helpers import prune_dict @@ -60,6 +61,22 @@ def build_task_kwargs() -> dict: task_kwargs = _fetch_config_values() task_kwargs.update(_fetch_templated_kwargs()) +has_launch_type: bool = "launch_type" in task_kwargs +has_capacity_provider: bool = "capacity_provider_strategy" in task_kwargs + +if has_capacity_provider and has_launch_type: +raise ValueError( +"capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both." +) +elif "cluster" in task_kwargs and not (has_capacity_provider or has_launch_type): +# Default API behavior if neither is provided is to fall back on the default capacity +# provider if it exists. Since it is not a required value, check if there is one +# before using it, and if there is not then use the FARGATE launch_type as +# the final fallback. +cluster = EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0] +if not cluster.get("defaultCapacityProviderStrategy"): +task_kwargs["launch_type"] = "FARGATE" + # There can only be 1 count of these containers task_kwargs["count"] = 1 # type: ignore # There could be a generic approach to the below, but likely more convoluted then just manually ensuring diff --git a/airflow/providers/amazon/aws/executors/ecs/utils.py b/airflow/providers/amazon/aws/executors/ecs/utils.py index 4966fa3d2b..7913bdf227 100644 --- a/airflow/providers/amazon/aws/executors/ecs/utils.py +++ b/airflow/providers/amazon/aws/executors/ecs/utils.py @@ -44,7 +44,6 @@ CONFIG_DEFAULTS = { "conn_id": "aws_default", "max_run_task_attempts": "3", "assign_public_ip": "False", -"launch_type": "FARGATE", "platform_version": "LATEST", "check_health_on_startup": "True", } @@ -81,6 +80,7 @@ class RunTaskKwargsConfigKeys(BaseConfigKeys): """Keys loaded into the config which are valid ECS run_task kwargs.""" ASSIGN_PUBLIC_IP = "assign_public_ip" +CAPACITY_PROVIDER_STRATEGY = "capacity_provider_strategy" CLUSTER = "cluster" LAUNCH_TYPE = "launch_type" PLATFORM_VERSION = "platform_version" diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index c8b1e8e50e..4cd8a1278a 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -838,6 +838,19 @@ config: type: string example: "ecs_executor_cluster" default: ~ + capacity_provider_strategy: +description: | + The capacity provider strategy to use for the task. + + If a Capacity Provider Strategy is specified, the Launch Type parameter must be omitted. If + no Capacity Provider Strategy or Launch Type is specified, the Default CapacityProvider Strategy + for the cluster is used, if present. + + When you use cluster auto scaling, you must specify Capacity Provider Strategy and not Launch Type. +version_added: "8.17" +type: string +example: "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" +default
(airflow) branch main updated: Increase timeout in `example_ec2` system test when hibernating an instance (#36114)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new abdd0454ec Increase timeout in `example_ec2` system test when hibernating an instance (#36114) abdd0454ec is described below commit abdd0454ec1be9ce6eb04e67375e5894963fac31 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Dec 7 17:19:30 2023 -0500 Increase timeout in `example_ec2` system test when hibernating an instance (#36114) --- tests/system/providers/amazon/aws/example_ec2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system/providers/amazon/aws/example_ec2.py b/tests/system/providers/amazon/aws/example_ec2.py index 11320f1047..10b9c62338 100644 --- a/tests/system/providers/amazon/aws/example_ec2.py +++ b/tests/system/providers/amazon/aws/example_ec2.py @@ -172,7 +172,8 @@ with DAG( ) # [END howto_operator_ec2_hibernate_instance] hibernate_instance.wait_for_completion = True -hibernate_instance.max_attempts = 75 +hibernate_instance.poll_interval = 60 +hibernate_instance.max_attempts = 40 # [START howto_operator_ec2_terminate_instance] terminate_instance = EC2TerminateInstanceOperator(
(airflow) branch main updated (90953d1e2f -> 651b32606c)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 90953d1e2f Allow non-str value for path concat (#35290) add 651b32606c Clarify "task" in ECS Executor log messages (#35304) No new revisions were added by this update. Summary of changes: .../providers/amazon/aws/executors/ecs/__init__.py | 21 ++--- 1 file changed, 14 insertions(+), 7 deletions(-)
(airflow) branch main updated (119cd677ec -> ab80ca86e9)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 119cd677ec Improve test coverage for the rest api modules (#35219) add ab80ca86e9 Add back dag_run to docs (#35142) No new revisions were added by this update. Summary of changes: docs/apache-airflow/public-airflow-interface.rst | 10 ++ docs/conf.py | 1 + 2 files changed, 11 insertions(+)
[airflow] branch main updated: Add `check_interval` and `max_attempts` as parameter of `DynamoDBToS3Operator` (#34972)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new b1196460db Add `check_interval` and `max_attempts` as parameter of `DynamoDBToS3Operator` (#34972) b1196460db is described below commit b1196460db1a21b2c6c3ef2e841fc6d0c22afe97 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Oct 16 15:09:20 2023 -0400 Add `check_interval` and `max_attempts` as parameter of `DynamoDBToS3Operator` (#34972) --- airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py | 14 +- .../system/providers/amazon/aws/example_dynamodb_to_s3.py | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index b83ff48906..5351f3ff7c 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -92,6 +92,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): the Unix epoch. The table export will be a snapshot of the table's state at this point in time. :param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON or ION. +:param check_interval: The amount of time in seconds to wait between attempts. Only if ``export_time`` is +provided. +:param max_attempts: The maximum number of attempts to be made. Only if ``export_time`` is provided. """ template_fields: Sequence[str] = ( @@ -104,6 +107,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): "process_func", "export_time", "export_format", +"check_interval", +"max_attempts", ) template_fields_renderers = { @@ -121,6 +126,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes, export_time: datetime | None = None, export_format: str = "DYNAMODB_JSON", +check_interval: int = 30, +max_attempts: int = 60, **kwargs, ) -> None: super().__init__(**kwargs) @@ -132,6 +139,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): self.s3_key_prefix = s3_key_prefix self.export_time = export_time self.export_format = export_format +self.check_interval = check_interval +self.max_attempts = max_attempts @cached_property def hook(self): @@ -164,7 +173,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): ) waiter = self.hook.get_waiter("export_table") export_arn = response.get("ExportDescription", {}).get("ExportArn") -waiter.wait(ExportArn=export_arn) +waiter.wait( +ExportArn=export_arn, +WaiterConfig={"Delay": self.check_interval, "MaxAttempts": self.max_attempts}, +) def _export_entire_data(self): """Export all data from the table.""" diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py index 70ea9ad77e..dc08e2d5b9 100644 --- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py +++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py @@ -172,6 +172,8 @@ with DAG( s3_key_prefix=f"{S3_KEY_PREFIX}-3-", ) # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time] +# This operation can take a long time to complete +backup_db_to_point_in_time.max_attempts = 90 delete_table = delete_dynamodb_table(table_name=table_name)
[airflow] branch main updated: D401 Support - A thru Common (Inclusive) (#34934)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new f23170c9dd D401 Support - A thru Common (Inclusive) (#34934) f23170c9dd is described below commit f23170c9dd23556a40bd07b5d24f06220eec15c4 Author: D. Ferruzzi AuthorDate: Mon Oct 16 00:49:12 2023 -0700 D401 Support - A thru Common (Inclusive) (#34934) --- airflow/providers/amazon/aws/utils/suppress.py | 2 +- airflow/providers/asana/hooks/asana.py | 2 +- .../providers/celery/executors/celery_executor.py | 9 ++-- .../celery/executors/celery_executor_utils.py | 6 +-- .../celery/executors/celery_kubernetes_executor.py | 7 +-- airflow/providers/cloudant/hooks/cloudant.py | 4 +- .../backcompat/backwards_compat_converters.py | 25 +++ .../kubernetes/executors/kubernetes_executor.py| 9 ++-- .../executors/kubernetes_executor_utils.py | 8 ++-- .../executors/local_kubernetes_executor.py | 7 +-- .../providers/cncf/kubernetes/hooks/kubernetes.py | 32 ++--- airflow/providers/cncf/kubernetes/kube_client.py | 4 +- .../cncf/kubernetes/kubernetes_helper_functions.py | 2 +- airflow/providers/cncf/kubernetes/operators/pod.py | 14 +++--- airflow/providers/cncf/kubernetes/pod_generator.py | 8 ++-- .../cncf/kubernetes/pod_generator_deprecated.py| 6 +-- .../cncf/kubernetes/pod_launcher_deprecated.py | 20 - .../cncf/kubernetes/python_kubernetes_script.py| 4 +- airflow/providers/cncf/kubernetes/secret.py| 8 ++-- airflow/providers/cncf/kubernetes/triggers/pod.py | 4 +- .../providers/cncf/kubernetes/utils/pod_manager.py | 52 +++--- .../cncf/kubernetes/utils/xcom_sidecar.py | 2 +- airflow/providers/common/sql/hooks/sql.py | 51 +++-- airflow/providers/common/sql/operators/sql.py | 6 ++- 24 files changed, 152 insertions(+), 140 deletions(-) diff --git a/airflow/providers/amazon/aws/utils/suppress.py b/airflow/providers/amazon/aws/utils/suppress.py index 6ef282c104..908106f0c3 100644 --- a/airflow/providers/amazon/aws/utils/suppress.py +++ b/airflow/providers/amazon/aws/utils/suppress.py @@ -41,7 +41,7 @@ log = logging.getLogger(__name__) def return_on_error(return_value: RT): """ -Helper decorator which suppress any ``Exception`` raised in decorator function. +Suppress any ``Exception`` raised in decorator function. Main use-case when functional is optional, however any error on functions/methods might raise any error which are subclass of ``Exception``. diff --git a/airflow/providers/asana/hooks/asana.py b/airflow/providers/asana/hooks/asana.py index 3bbb40eeb0..ec9360b27e 100644 --- a/airflow/providers/asana/hooks/asana.py +++ b/airflow/providers/asana/hooks/asana.py @@ -96,7 +96,7 @@ class AsanaHook(BaseHook): def create_task(self, task_name: str, params: dict | None) -> dict: """ -Creates an Asana task. +Create an Asana task. :param task_name: Name of the new task :param params: Other task attributes, such as due_on, parent, and notes. For a complete list diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index ef07891079..f4aa2a9b75 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -340,14 +340,14 @@ class CeleryExecutor(BaseExecutor): self.update_all_task_states() def debug_dump(self) -> None: -"""Called in response to SIGUSR2 by the scheduler.""" +"""Debug dump; called in response to SIGUSR2 by the scheduler.""" super().debug_dump() self.log.info( "executor.tasks (%d)\n\t%s", len(self.tasks), "\n\t".join(map(repr, self.tasks.items())) ) def update_all_task_states(self) -> None: -"""Updates states of the tasks.""" +"""Update states of the tasks.""" self.log.debug("Inquiring about %s celery task(s)", len(self.tasks)) state_and_info_by_celery_task_id = self.bulk_state_fetcher.get_many(self.tasks.values()) @@ -362,7 +362,7 @@ class CeleryExecutor(BaseExecutor): self.tasks.pop(key, None) def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None: -"""Updates state of a single task.""" +"""Update state of a single task.""" try: if state == celery_states.SUCCESS: self.success(key, info) @@ -483,7
[airflow] branch main updated: D401 Support - Providers: DaskExecutor to Github (Inclusive) (#34935)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 7a93b19138 D401 Support - Providers: DaskExecutor to Github (Inclusive) (#34935) 7a93b19138 is described below commit 7a93b1913845710eb67ab4670c1be9e9382c030b Author: D. Ferruzzi AuthorDate: Mon Oct 16 00:49:36 2023 -0700 D401 Support - Providers: DaskExecutor to Github (Inclusive) (#34935) --- airflow/providers/databricks/hooks/databricks.py | 46 +++--- .../providers/databricks/hooks/databricks_base.py | 10 ++--- .../providers/databricks/hooks/databricks_sql.py | 5 ++- .../providers/databricks/operators/databricks.py | 8 ++-- .../databricks/operators/databricks_repos.py | 8 ++-- .../databricks/operators/databricks_sql.py | 2 +- .../databricks/sensors/databricks_partition.py | 8 ++-- .../providers/databricks/sensors/databricks_sql.py | 4 +- airflow/providers/databricks/utils/databricks.py | 2 +- airflow/providers/datadog/hooks/datadog.py | 8 ++-- airflow/providers/dbt/cloud/hooks/dbt.py | 40 +-- airflow/providers/dbt/cloud/operators/dbt.py | 4 +- airflow/providers/dbt/cloud/sensors/dbt.py | 8 ++-- airflow/providers/dbt/cloud/triggers/dbt.py| 2 +- airflow/providers/dbt/cloud/utils/openlineage.py | 4 +- airflow/providers/docker/hooks/docker.py | 4 +- .../providers/elasticsearch/hooks/elasticsearch.py | 8 ++-- .../providers/elasticsearch/log/es_task_handler.py | 8 ++-- airflow/providers/exasol/hooks/exasol.py | 3 +- airflow/providers/facebook/ads/hooks/ads.py| 4 +- airflow/providers/ftp/hooks/ftp.py | 20 +- airflow/providers/ftp/operators/ftp.py | 2 +- airflow/providers/github/hooks/github.py | 4 +- airflow/providers/github/sensors/github.py | 6 +-- 24 files changed, 110 insertions(+), 108 deletions(-) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index 06ba762327..5d3f714b44 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -196,7 +196,7 @@ class DatabricksHook(BaseDatabricksHook): def run_now(self, json: dict) -> int: """ -Utility function to call the ``api/2.1/jobs/run-now`` endpoint. +Call the ``api/2.1/jobs/run-now`` endpoint. :param json: The data used in the body of the request to the ``run-now`` endpoint. :return: the run_id as an int @@ -206,7 +206,7 @@ class DatabricksHook(BaseDatabricksHook): def submit_run(self, json: dict) -> int: """ -Utility function to call the ``api/2.1/jobs/runs/submit`` endpoint. +Call the ``api/2.1/jobs/runs/submit`` endpoint. :param json: The data used in the body of the request to the ``submit`` endpoint. :return: the run_id as an int @@ -223,7 +223,7 @@ class DatabricksHook(BaseDatabricksHook): page_token: str | None = None, ) -> list[dict[str, Any]]: """ -Lists the jobs in the Databricks Job Service. +List the jobs in the Databricks Job Service. :param limit: The limit/batch size used to retrieve jobs. :param offset: The offset of the first job to return, relative to the most recently created job. @@ -274,7 +274,7 @@ class DatabricksHook(BaseDatabricksHook): def find_job_id_by_name(self, job_name: str) -> int | None: """ -Finds job id by its name. If there are multiple jobs with the same name, raises AirflowException. +Find job id by its name; if there are multiple jobs with the same name, raise AirflowException. :param job_name: The name of the job to look up. :return: The job_id as an int or None if no job was found. @@ -295,7 +295,7 @@ class DatabricksHook(BaseDatabricksHook): self, batch_size: int = 25, pipeline_name: str | None = None, notebook_path: str | None = None ) -> list[dict[str, Any]]: """ -Lists the pipelines in Databricks Delta Live Tables. +List the pipelines in Databricks Delta Live Tables. :param batch_size: The limit/batch size used to retrieve pipelines. :param pipeline_name: Optional name of a pipeline to search. Cannot be combined with path. @@ -334,7 +334,7 @@ class DatabricksHook(BaseDatabricksHook): def find_pipeline_id_by_name(self, pipeline_name: str) -> str | None: """ -Finds pipeline id by its name. If multiple pipelines with the same name, raises AirflowException. +Find pipeline id by its name; if multiple pipe
[airflow] branch main updated (27671fa53d -> 84a3daed86)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 27671fa53d Add session configuration to deployment manager's responsibilities. (#34866) add 84a3daed86 Implements `AwsBaseOperator` and `AwsBaseSensor` (#34784) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/hooks/base_aws.py | 4 +- airflow/providers/amazon/aws/operators/base_aws.py | 97 airflow/providers/amazon/aws/sensors/base_aws.py | 96 airflow/providers/amazon/aws/utils/mixins.py | 165 airflow/providers/amazon/provider.yaml | 6 + tests/always/test_project_structure.py | 2 + tests/providers/amazon/aws/hooks/test_base_aws.py | 7 +- .../amazon/aws/operators/test_base_aws.py | 172 + .../providers/amazon/aws/sensors/test_base_aws.py | 172 + 9 files changed, 719 insertions(+), 2 deletions(-) create mode 100644 airflow/providers/amazon/aws/operators/base_aws.py create mode 100644 airflow/providers/amazon/aws/sensors/base_aws.py create mode 100644 airflow/providers/amazon/aws/utils/mixins.py create mode 100644 tests/providers/amazon/aws/operators/test_base_aws.py create mode 100644 tests/providers/amazon/aws/sensors/test_base_aws.py
[airflow] branch main updated (493431023f -> e333380077)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 493431023f Release notes for helm chart 1.11.0 (#34618) add e80077 Clarify Amazon Lambda invocation and sensing (#34653) No new revisions were added by this update. Summary of changes: .../amazon/aws/sensors/lambda_function.py | 4 ++- .../operators/lambda.rst | 38 -- 2 files changed, 38 insertions(+), 4 deletions(-)
[airflow] branch main updated (3766ab07d7 -> cc360b73c9)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 3766ab07d7 More complete fix for dev release scripts to filter commit for PR (#33418) add cc360b73c9 Fixing static checks for FabAirflowSecurityManagerOverride (#33416) No new revisions were added by this update. Summary of changes: airflow/auth/managers/fab/security_manager/modules/db.py| 6 +++--- airflow/auth/managers/fab/security_manager/modules/oauth.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-)
[airflow] branch main updated: Enable D205 Support (#33398)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new a1e42f6626 Enable D205 Support (#33398) a1e42f6626 is described below commit a1e42f66260447c22c708e49943e3a39ba2b678f Author: D. Ferruzzi AuthorDate: Mon Aug 14 22:22:08 2023 -0700 Enable D205 Support (#33398) --- airflow/providers/google/cloud/hooks/datapipeline.py | 1 + pyproject.toml | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/hooks/datapipeline.py b/airflow/providers/google/cloud/hooks/datapipeline.py index 3203238a39..c9c4790106 100644 --- a/airflow/providers/google/cloud/hooks/datapipeline.py +++ b/airflow/providers/google/cloud/hooks/datapipeline.py @@ -32,6 +32,7 @@ DEFAULT_DATAPIPELINE_LOCATION = "us-central1" class DataPipelineHook(GoogleBaseHook): """ Hook for Google Data Pipelines. + All the methods in the hook where project_id is used must be called with keyword arguments rather than positional. """ diff --git a/pyproject.toml b/pyproject.toml index 630d44cf05..edca58432b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,6 @@ extend-select = [ ] extend-ignore = [ "D203", -"D205", "D212", "D213", "D214",
[airflow] branch main updated: Fix system-test pytest marker (#32978)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 11629f65f1 Fix system-test pytest marker (#32978) 11629f65f1 is described below commit 11629f65f1efe473ff4842b8e83cb518a1f56a0c Author: Jarek Potiuk AuthorDate: Mon Jul 31 22:23:06 2023 +0200 Fix system-test pytest marker (#32978) We have an automated system-test pytest marker that applies the pytest.mark.system marker to all system tests. It has been implemented in a strange way as it was applying the marker to all provider tests if the whole "tests" directory was used for test collection. This caused quarantine tests from providers folder to be skipped because they were automatically marked with pytest.mark.system marker. Also system tests were generally excluded from running after we brought back the "test_*" prefix. This PR updates the auto-marker to only apply system marker to tests in "system/providers" folder and adds the "example_*" prefix to the prefixes automatically collected by pytest. --- pyproject.toml | 1 + tests/system/conftest.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index de7dc5a905..630d44cf05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,7 @@ filterwarnings = [ ] python_files = [ "test_*.py", +"example_*.py", ] testpaths = [ "tests", diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 500a41a5c3..154e7c208f 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -52,7 +52,7 @@ def pytest_collection_modifyitems(config, items): rootdir = Path(config.rootdir) for item in items: rel_path = Path(item.fspath).relative_to(rootdir) -match = re.match(".+/providers/([^/]+)", str(rel_path)) +match = re.match(".+/system/providers/([^/]+)", str(rel_path)) if not match: continue provider = match.group(1)
[airflow] branch main updated: Handle logout by auth manager (#32819)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 2b0d88e450 Handle logout by auth manager (#32819) 2b0d88e450 is described below commit 2b0d88e450f11af8e447864ca258142a6756126d Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Jul 31 15:20:38 2023 -0400 Handle logout by auth manager (#32819) * Handle logout by auth manager --- airflow/auth/managers/base_auth_manager.py | 10 -- airflow/auth/managers/fab/fab_auth_manager.py | 6 ++ airflow/www/auth.py| 2 +- airflow/www/extensions/init_appbuilder.py | 4 airflow/www/extensions/init_security.py| 4 +--- airflow/www/templates/appbuilder/navbar_right.html | 2 +- tests/auth/managers/fab/test_fab_auth_manager.py | 11 +++ tests/www/views/test_session.py| 8 +--- 8 files changed, 25 insertions(+), 22 deletions(-) diff --git a/airflow/auth/managers/base_auth_manager.py b/airflow/auth/managers/base_auth_manager.py index a234efcaca..cb40ee9c87 100644 --- a/airflow/auth/managers/base_auth_manager.py +++ b/airflow/auth/managers/base_auth_manager.py @@ -41,32 +41,30 @@ class BaseAuthManager(LoggingMixin): @abstractmethod def get_user_name(self) -> str: """Return the username associated to the user in session.""" -... @abstractmethod def get_user(self) -> BaseUser: """Return the user associated to the user in session.""" -... @abstractmethod def get_user_id(self) -> str: """Return the user ID associated to the user in session.""" -... @abstractmethod def is_logged_in(self) -> bool: """Return whether the user is logged in.""" -... @abstractmethod def get_url_login(self, **kwargs) -> str: """Return the login page url.""" -... + +@abstractmethod +def get_url_logout(self) -> str: +"""Return the logout page url.""" @abstractmethod def get_url_user_profile(self) -> str | None: """Return the url to a page displaying info about the current user.""" -... def get_security_manager_override_class(self) -> type: """ diff --git a/airflow/auth/managers/fab/fab_auth_manager.py b/airflow/auth/managers/fab/fab_auth_manager.py index 085b6e997b..7a23953b35 100644 --- a/airflow/auth/managers/fab/fab_auth_manager.py +++ b/airflow/auth/managers/fab/fab_auth_manager.py @@ -70,6 +70,12 @@ class FabAuthManager(BaseAuthManager): else: return url_for(f"{self.security_manager.auth_view.endpoint}.login") +def get_url_logout(self): +"""Return the logout page url.""" +if not self.security_manager.auth_view: +raise AirflowException("`auth_view` not defined in the security manager.") +return url_for(f"{self.security_manager.auth_view.endpoint}.logout") + def get_url_user_profile(self) -> str | None: """Return the url to a page displaying info about the current user.""" if not self.security_manager.user_view: diff --git a/airflow/www/auth.py b/airflow/www/auth.py index 46b645f197..9afe995054 100644 --- a/airflow/www/auth.py +++ b/airflow/www/auth.py @@ -54,7 +54,7 @@ def has_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "redact", -logout_url=appbuilder.get_url_for_logout, +logout_url=get_auth_manager().get_url_logout(), ), 403, ) diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py index 2537ea8bc7..11c358abb6 100644 --- a/airflow/www/extensions/init_appbuilder.py +++ b/airflow/www/extensions/init_appbuilder.py @@ -588,10 +588,6 @@ class AirflowAppBuilder: def get_url_for_login_with(self, next_url: str | None = None) -> str: return get_auth_manager().get_url_login(next_url=next_url) -@property -def get_url_for_logout(self): -return url_for(f"{self.sm.auth_view.endpoint}.logout") - @property def get_url_for_index(self): return url_for(f"{self.indexview.endpoint}.{sel
[airflow] branch main updated: Increase the number of attempts in AWS system test `example_rds_export` (#32976)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 86193f5608 Increase the number of attempts in AWS system test `example_rds_export` (#32976) 86193f5608 is described below commit 86193f560815507b9abf1008c19b133d95c4da9f Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Jul 31 15:18:57 2023 -0400 Increase the number of attempts in AWS system test `example_rds_export` (#32976) --- airflow/providers/amazon/aws/operators/rds.py | 9 - tests/system/providers/amazon/aws/example_rds_export.py | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 7a544a8c00..a4a2ac5c33 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -392,6 +392,8 @@ class RdsCancelExportTaskOperator(RdsBaseOperator): :param export_task_identifier: The identifier of the snapshot export task to cancel :param wait_for_completion: If True, waits for DB snapshot export to cancel. (default: True) +:param check_interval: The amount of time in seconds to wait between attempts +:param max_attempts: The maximum number of attempts to be made """ template_fields = ("export_task_identifier",) @@ -402,6 +404,7 @@ class RdsCancelExportTaskOperator(RdsBaseOperator): export_task_identifier: str, wait_for_completion: bool = True, check_interval: int = 30, +max_attempts: int = 40, **kwargs, ): super().__init__(**kwargs) @@ -409,6 +412,7 @@ class RdsCancelExportTaskOperator(RdsBaseOperator): self.export_task_identifier = export_task_identifier self.wait_for_completion = wait_for_completion self.check_interval = check_interval +self.max_attempts = max_attempts def execute(self, context: Context) -> str: self.log.info("Canceling export task %s", self.export_task_identifier) @@ -419,7 +423,10 @@ class RdsCancelExportTaskOperator(RdsBaseOperator): if self.wait_for_completion: self.hook.wait_for_export_task_state( -self.export_task_identifier, target_state="canceled", check_interval=self.check_interval +self.export_task_identifier, +target_state="canceled", +check_interval=self.check_interval, +max_attempts=self.max_attempts, ) return json.dumps(cancel_export, default=str) diff --git a/tests/system/providers/amazon/aws/example_rds_export.py b/tests/system/providers/amazon/aws/example_rds_export.py index d103763952..a87bce10de 100644 --- a/tests/system/providers/amazon/aws/example_rds_export.py +++ b/tests/system/providers/amazon/aws/example_rds_export.py @@ -128,6 +128,7 @@ with DAG( ) # [END howto_operator_rds_cancel_export] cancel_export.check_interval = 10 +cancel_export.max_attempts = 120 # [START howto_sensor_rds_export_task_existence] export_sensor = RdsExportTaskExistenceSensor(
[airflow] branch main updated: Add GCS Requester Pays bucket support to GCSToS3Operator (#32760)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 915f9e4060 Add GCS Requester Pays bucket support to GCSToS3Operator (#32760) 915f9e4060 is described below commit 915f9e40601fbfa3ebcf2fe82ced14191b12ab18 Author: Hank Ehly AuthorDate: Tue Aug 1 02:33:52 2023 +0900 Add GCS Requester Pays bucket support to GCSToS3Operator (#32760) * Add requester pays bucket support to GCSToS3Operator * Update docstrings * isort * Fix failing unit tests * Fix failing test --- .../providers/amazon/aws/transfers/gcs_to_s3.py| 32 - airflow/providers/google/cloud/hooks/gcs.py| 52 +- airflow/providers/google/cloud/operators/gcs.py| 8 +++- .../amazon/aws/transfers/test_gcs_to_s3.py | 6 ++- tests/providers/google/cloud/hooks/test_gcs.py | 6 +-- tests/providers/google/cloud/operators/test_gcs.py | 5 ++- .../providers/amazon/aws/example_gcs_to_s3.py | 52 -- 7 files changed, 126 insertions(+), 35 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index d57de7e11e..2cdd0761a1 100644 --- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -80,6 +80,8 @@ class GCSToS3Operator(BaseOperator): on the bucket is recreated within path passed in dest_s3_key. :param match_glob: (Optional) filters objects based on the glob pattern given by the string (e.g, ``'**/*/.json'``) +:param gcp_user_project: (Optional) The identifier of the Google Cloud project to bill for this request. +Required for Requester Pays buckets. """ template_fields: Sequence[str] = ( @@ -88,6 +90,7 @@ class GCSToS3Operator(BaseOperator): "delimiter", "dest_s3_key", "google_impersonation_chain", +"gcp_user_project", ) ui_color = "#f0eee4" @@ -107,6 +110,7 @@ class GCSToS3Operator(BaseOperator): s3_acl_policy: str | None = None, keep_directory_structure: bool = True, match_glob: str | None = None, +gcp_user_project: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -130,10 +134,11 @@ class GCSToS3Operator(BaseOperator): self.s3_acl_policy = s3_acl_policy self.keep_directory_structure = keep_directory_structure self.match_glob = match_glob +self.gcp_user_project = gcp_user_project def execute(self, context: Context) -> list[str]: # list all files in an Google Cloud Storage bucket -hook = GCSHook( +gcs_hook = GCSHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.google_impersonation_chain, ) @@ -145,8 +150,12 @@ class GCSToS3Operator(BaseOperator): self.prefix, ) -files = hook.list( -bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter, match_glob=self.match_glob +gcs_files = gcs_hook.list( +bucket_name=self.bucket, +prefix=self.prefix, +delimiter=self.delimiter, +match_glob=self.match_glob, +user_project=self.gcp_user_project, ) s3_hook = S3Hook( @@ -173,24 +182,23 @@ class GCSToS3Operator(BaseOperator): existing_files = existing_files if existing_files is not None else [] # remove the prefix for the existing files to allow the match existing_files = [file.replace(prefix, "", 1) for file in existing_files] -files = list(set(files) - set(existing_files)) +gcs_files = list(set(gcs_files) - set(existing_files)) -if files: - -for file in files: -with hook.provide_file(object_name=file, bucket_name=self.bucket) as local_tmp_file: +if gcs_files: +for file in gcs_files: +with gcs_hook.provide_file( +object_name=file, bucket_name=self.bucket, user_project=self.gcp_user_project +) as local_tmp_file: dest_key = os.path.join(self.dest_s3_key, file) self.log.info("Saving file to %s", dest_key) - s3_hook.load_file( filename=local_tmp_file.name, key=dest_key, replace=self.replace, acl_policy=self.s3_acl_policy, ) - -self.log.info("All done, uploaded %d files to S3", len(files)) +self.log.info(&q
[airflow] branch main updated: Allow auth managers to override the security manager (#32525)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 031e3945e4 Allow auth managers to override the security manager (#32525) 031e3945e4 is described below commit 031e3945e44030d4f085753ffdc43dc104708f91 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Jul 24 13:46:44 2023 -0400 Allow auth managers to override the security manager (#32525) Allow auth managers to override the security manager --- airflow/api/auth/backend/session.py| 4 +- airflow/auth/managers/base_auth_manager.py | 12 ++ airflow/auth/managers/fab/fab_auth_manager.py | 5 + .../auth/managers/fab/security_manager_override.py | 220 + airflow/configuration.py | 34 +++- airflow/www/auth.py| 5 +- airflow/www/decorators.py | 4 +- airflow/www/extensions/init_appbuilder.py | 7 - .../extensions/init_auth_manager.py} | 30 +-- airflow/www/extensions/init_jinja_globals.py | 5 +- airflow/www/extensions/init_security.py| 5 +- airflow/www/fab_security/manager.py| 137 + airflow/www/fab_security/sqla/manager.py | 2 +- airflow/www/security.py| 40 +++- airflow/www/views.py | 12 +- tests/auh/managers/fab/test_fab_auth_manager.py| 4 + .../auh/managers/test_base_auth_manager.py | 27 ++- 17 files changed, 360 insertions(+), 193 deletions(-) diff --git a/airflow/api/auth/backend/session.py b/airflow/api/auth/backend/session.py index c55f748460..ef914b57e4 100644 --- a/airflow/api/auth/backend/session.py +++ b/airflow/api/auth/backend/session.py @@ -22,7 +22,7 @@ from typing import Any, Callable, TypeVar, cast from flask import Response -from airflow.configuration import auth_manager +from airflow.www.extensions.init_auth_manager import get_auth_manager CLIENT_AUTH: tuple[str, str] | Any | None = None @@ -39,7 +39,7 @@ def requires_authentication(function: T): @wraps(function) def decorated(*args, **kwargs): -if not auth_manager.is_logged_in(): +if not get_auth_manager().is_logged_in(): return Response("Unauthorized", 401, {}) return function(*args, **kwargs) diff --git a/airflow/auth/managers/base_auth_manager.py b/airflow/auth/managers/base_auth_manager.py index 462fe34d63..ab5356bf8c 100644 --- a/airflow/auth/managers/base_auth_manager.py +++ b/airflow/auth/managers/base_auth_manager.py @@ -38,3 +38,15 @@ class BaseAuthManager(LoggingMixin): def is_logged_in(self) -> bool: """Return whether the user is logged in.""" ... + +def get_security_manager_override_class(self) -> type: +""" +Return the security manager override class. + +The security manager override class is responsible for overriding the default security manager +class airflow.www.security.AirflowSecurityManager with a custom implementation. This class is +essentially inherited from airflow.www.security.AirflowSecurityManager. + +By default, return an empty class. +""" +return object diff --git a/airflow/auth/managers/fab/fab_auth_manager.py b/airflow/auth/managers/fab/fab_auth_manager.py index b9f0c1e1df..f90a9ac063 100644 --- a/airflow/auth/managers/fab/fab_auth_manager.py +++ b/airflow/auth/managers/fab/fab_auth_manager.py @@ -20,6 +20,7 @@ from __future__ import annotations from flask_login import current_user from airflow.auth.managers.base_auth_manager import BaseAuthManager +from airflow.auth.managers.fab.security_manager_override import FabAirflowSecurityManagerOverride class FabAuthManager(BaseAuthManager): @@ -43,3 +44,7 @@ class FabAuthManager(BaseAuthManager): def is_logged_in(self) -> bool: """Return whether the user is logged in.""" return current_user and not current_user.is_anonymous + +def get_security_manager_override_class(self) -> type: +"""Return the security manager override.""" +return FabAirflowSecurityManagerOverride diff --git a/airflow/auth/managers/fab/security_manager_override.py b/airflow/auth/managers/fab/security_manager_override.py new file mode 100644 index 00..5be9ee1f36 --- /dev/null +++ b/airflow/auth/managers/fab/security_manager_override.py @@ -0,0 +1,220 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# rega
[airflow] branch main updated: Fix refuse_to_run_test_from_wrongly_named_files to handle system tests (#32655)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 6064a6e3e7 Fix refuse_to_run_test_from_wrongly_named_files to handle system tests (#32655) 6064a6e3e7 is described below commit 6064a6e3e78605d1fe79e2acfa473e860312d685 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Jul 17 14:55:23 2023 -0400 Fix refuse_to_run_test_from_wrongly_named_files to handle system tests (#32655) --- tests/conftest.py | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 98dcb3be96..9fdda1467c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -909,8 +909,16 @@ def clear_lru_cache(): @pytest.fixture(autouse=True) def refuse_to_run_test_from_wrongly_named_files(request): +dirname: str = request.node.fspath.dirname filename: str = request.node.fspath.basename -if not request.node.fspath.basename.startswith("test_"): +is_system_test: bool = "tests/system/" in dirname +if is_system_test and not request.node.fspath.basename.startswith("example_"): +raise Exception( +f"All test method files in tests/system must start with 'example_'. Seems that {filename} " +f"contains {request.function} that looks like a test case. Please rename the file to " +f"follow the example_* pattern if you want to run the tests in it." +) +if not is_system_test and not request.node.fspath.basename.startswith("test_"): raise Exception( f"All test method files in tests/ must start with 'test_'. Seems that {filename} " f"contains {request.function} that looks like a test case. Please rename the file to "
[airflow] branch main updated: quick fix on RDS operator to prevent parameter collision (#32436)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 8c6751ff0f quick fix on RDS operator to prevent parameter collision (#32436) 8c6751ff0f is described below commit 8c6751ff0f2056af1cb08cec03db8a4d6c913ca7 Author: Raphaël Vandon AuthorDate: Fri Jul 7 15:39:29 2023 -0700 quick fix on RDS operator to prevent parameter collision (#32436) * quick fix on RDS operator to prevent parameter collision without this code, if the user specified a region in the hook params, it'd create an error about the param being specified twice * use initialized value --- airflow/providers/amazon/aws/operators/rds.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 2630098bc4..c037251643 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -62,8 +62,9 @@ class RdsBaseOperator(BaseOperator): AirflowProviderDeprecationWarning, stacklevel=3, # 2 is in the operator's init, 3 is in the user code creating the operator ) -self.region_name = region_name -self.hook = RdsHook(aws_conn_id=aws_conn_id, region_name=region_name, **(hook_params or {})) +hook_params = hook_params or {} +self.region_name = region_name or hook_params.pop("region_name", None) +self.hook = RdsHook(aws_conn_id=aws_conn_id, region_name=self.region_name, **(hook_params)) super().__init__(*args, **kwargs) self._await_interval = 60 # seconds
[airflow] branch main updated (8303ad1b94 -> 723eb7d453)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 8303ad1b94 Ignore error on mssql temp removal when cleaning (#32433) add 723eb7d453 Give better link to job configuration docs in BigQueryInsertJobOperator (#31736) No new revisions were added by this update. Summary of changes: airflow/providers/google/cloud/operators/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[airflow] branch main updated: Add ferruzzi as committer (#32353)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c2b9fcc108 Add ferruzzi as committer (#32353) c2b9fcc108 is described below commit c2b9fcc108eae4373350fd0559d6c166853bec40 Author: D. Ferruzzi AuthorDate: Tue Jul 4 13:56:01 2023 -0700 Add ferruzzi as committer (#32353) --- dev/breeze/src/airflow_breeze/global_constants.py | 1 + docs/apache-airflow/project.rst | 1 + 2 files changed, 2 insertions(+) diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index 510996a4b5..0a66f7d98c 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -225,6 +225,7 @@ COMMITTERS = [ "ephraimbuddy", "feluelle", "feng-tao", +"ferruzzi", "houqp", "hussein-awala", "jedcunningham", diff --git a/docs/apache-airflow/project.rst b/docs/apache-airflow/project.rst index 75bc4b2e7f..3516a6b8c0 100644 --- a/docs/apache-airflow/project.rst +++ b/docs/apache-airflow/project.rst @@ -51,6 +51,7 @@ Committers - Dan Davydov (@aoen) - Daniel Imberman (@dimberman) - Daniel Standish (@dstandish) +- Dennis Ferruzzi (@ferruzzi) - Elad Kalif (@eladkal) - Ephraim Anierobi (@ephraimbuddy) - Felix Uellendall (@feluelle)
[airflow] branch main updated: deprecate arbitrary parameter passing to RDS hook (#32352)
This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 5623a21a1f deprecate arbitrary parameter passing to RDS hook (#32352) 5623a21a1f is described below commit 5623a21a1fc738ccb97ade4d4197b181bf1128d4 Author: Raphaël Vandon AuthorDate: Tue Jul 4 13:28:15 2023 -0700 deprecate arbitrary parameter passing to RDS hook (#32352) * deprecate arbitrary parameter passing to RDS hook * add url to PR --- airflow/providers/amazon/aws/operators/rds.py | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 440d895afa..9aef670166 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -18,12 +18,13 @@ from __future__ import annotations import json +import warnings from datetime import timedelta from typing import TYPE_CHECKING, Sequence from mypy_boto3_rds.type_defs import TagTypeDef -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.rds import RdsHook from airflow.providers.amazon.aws.triggers.rds import RdsDbInstanceTrigger @@ -42,6 +43,15 @@ class RdsBaseOperator(BaseOperator): ui_fgcolor = "#ff" def __init__(self, *args, aws_conn_id: str = "aws_conn_id", hook_params: dict | None = None, **kwargs): +if hook_params is not None: +warnings.warn( +"The parameter hook_params is deprecated and will be removed. " +"If you were using it, please get in touch either on airflow slack, " +"or by opening a github issue on the project. " +"You can mention https://github.com/apache/airflow/pull/32352;, +AirflowProviderDeprecationWarning, +stacklevel=3, # 2 is in the operator's init, 3 is in the user code creating the operator +) self.hook_params = hook_params or {} self.hook = RdsHook(aws_conn_id=aws_conn_id, **self.hook_params) super().__init__(*args, **kwargs)