(airflow) branch main updated: Handle `AUTH_ROLE_PUBLIC` in FAB auth manager (#42280)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 8741e9c176 Handle `AUTH_ROLE_PUBLIC` in FAB auth manager (#42280) 8741e9c176 is described below commit 8741e9c1761931c7cff135d53b589053a04f58c1 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Fri Sep 20 13:52:43 2024 -0700 Handle `AUTH_ROLE_PUBLIC` in FAB auth manager (#42280) --- .../endpoints/forward_to_fab_endpoint.py | 131 -- airflow/api_connexion/openapi/v1.yaml | 310 --- airflow/api_connexion/security.py | 5 - .../config_templates/default_webserver_config.py | 2 +- .../auth_manager/api/auth/backend/basic_auth.py| 6 +- .../auth_manager/api/auth/backend/kerberos_auth.py | 6 +- .../providers/fab/auth_manager/fab_auth_manager.py | 17 +- .../fab/auth_manager/models/anonymous_user.py | 8 +- .../fab/auth_manager/security_manager/override.py | 6 +- airflow/www/static/js/types/api-generated.ts | 441 - airflow/www/utils.py | 11 +- airflow/www/views.py | 4 +- newsfragments/42280.significant.rst| 5 + tests/api_connexion/conftest.py| 11 - .../endpoints/test_config_endpoint.py | 22 - .../endpoints/test_connection_endpoint.py | 89 - tests/api_connexion/endpoints/test_dag_endpoint.py | 99 - .../endpoints/test_dag_run_endpoint.py | 185 - .../endpoints/test_dag_source_endpoint.py | 16 - .../endpoints/test_dag_warning_endpoint.py | 12 - .../endpoints/test_dataset_endpoint.py | 184 - .../endpoints/test_event_log_endpoint.py | 44 -- .../endpoints/test_forward_to_fab_endpoint.py | 238 --- .../test_role_and_permission_endpoint.py | 12 +- .../api_endpoints/test_user_endpoint.py| 1 + tests/providers/fab/auth_manager/conftest.py | 9 +- .../fab/auth_manager/test_fab_auth_manager.py | 10 +- 27 files changed, 60 insertions(+), 1824 deletions(-) diff --git a/airflow/api_connexion/endpoints/forward_to_fab_endpoint.py b/airflow/api_connexion/endpoints/forward_to_fab_endpoint.py deleted file mode 100644 index 9785a5b053..00 --- a/airflow/api_connexion/endpoints/forward_to_fab_endpoint.py +++ /dev/null @@ -1,131 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import warnings -from typing import TYPE_CHECKING - -from airflow.api_connexion.exceptions import BadRequest -from airflow.providers.fab.auth_manager.api_endpoints import role_and_permission_endpoint, user_endpoint -from airflow.www.extensions.init_auth_manager import get_auth_manager - -if TYPE_CHECKING: -from typing import Callable - -from airflow.api_connexion.types import APIResponse - - -def _require_fab(func: Callable) -> Callable: -""" -Raise an HTTP error 400 if the auth manager is not FAB. - -Intended to decorate endpoints that have been migrated from Airflow API to FAB API. -""" - -def inner(*args, **kwargs): -from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager - -auth_mgr = get_auth_manager() -if not isinstance(auth_mgr, FabAuthManager): -raise BadRequest( -detail="This endpoint is only available when using the default auth manager FabAuthManager." -) -else: -warnings.warn( -"This API endpoint is deprecated. " -"Please use the API under /auth/fab/v1 instead for this operation.", -DeprecationWarning, -stacklevel=1, # This decorator wrapped multiple times, better point to this file -) -return func(*args, **kwargs) - -return inner - - -### role - - -@_require_fab -def get_role(**kwargs) ->
(airflow) branch main updated (58b3771bf0 -> 4d7fad3bda)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 58b3771bf0 Add documentation for FAB DB commands (#42352) add 4d7fad3bda Remove empty dirs (#42375) No new revisions were added by this update. Summary of changes: airflow/auth/managers/fab/__init__.py | 17 - airflow/auth/managers/fab/api/__init__.py | 17 - airflow/auth/managers/fab/api/auth/__init__.py | 17 - airflow/auth/managers/fab/api/auth/backend/__init__.py | 17 - airflow/auth/managers/fab/security_manager/__init__.py | 17 - tests/auth/managers/fab/__init__.py| 16 6 files changed, 101 deletions(-) delete mode 100644 airflow/auth/managers/fab/__init__.py delete mode 100644 airflow/auth/managers/fab/api/__init__.py delete mode 100644 airflow/auth/managers/fab/api/auth/__init__.py delete mode 100644 airflow/auth/managers/fab/api/auth/backend/__init__.py delete mode 100644 airflow/auth/managers/fab/security_manager/__init__.py delete mode 100644 tests/auth/managers/fab/__init__.py
(airflow) branch v2-10-test updated: Use `selectinload` in trigger (#40487) (#42351)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch v2-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/v2-10-test by this push: new 893261043c Use `selectinload` in trigger (#40487) (#42351) 893261043c is described below commit 893261043c063a9adc2c7e35d83ad5b9b829adef Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Sep 19 10:23:43 2024 -0700 Use `selectinload` in trigger (#40487) (#42351) (cherry picked from commit 46b41e332f2d32962c0e3c14c1e5b8d052dfc359) Co-authored-by: Joseph Ang --- airflow/models/trigger.py | 6 +++--- docs/spelling_wordlist.txt | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index 8c4dee7ebc..43d9c515f0 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -21,7 +21,7 @@ from traceback import format_exception from typing import TYPE_CHECKING, Any, Iterable from sqlalchemy import Column, Integer, String, Text, delete, func, or_, select, update -from sqlalchemy.orm import joinedload, relationship +from sqlalchemy.orm import relationship, selectinload from sqlalchemy.sql.functions import coalesce from airflow.api_internal.internal_api_call import internal_api_call @@ -75,7 +75,7 @@ class Trigger(Base): uselist=False, ) -task_instance = relationship("TaskInstance", back_populates="trigger", lazy="joined", uselist=False) +task_instance = relationship("TaskInstance", back_populates="trigger", lazy="selectin", uselist=False) def __init__( self, @@ -152,7 +152,7 @@ class Trigger(Base): select(cls) .where(cls.id.in_(ids)) .options( -joinedload(cls.task_instance) +selectinload(cls.task_instance) .joinedload(TaskInstance.trigger) .joinedload(Trigger.triggerer_job) ) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 29d5f6d58a..1ca4c8dc45 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1439,6 +1439,7 @@ Seedlist seedlist seekable segmentGranularity +selectin Sendgrid sendgrid sentimentMax
(airflow) branch main updated (e757345452 -> 5cb5c9e874)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from e757345452 Fix example_bigtable system test (#42250) add 5cb5c9e874 Fix automl vision classification test (#42257) No new revisions were added by this update. Summary of changes: .../automl/example_automl_vision_classification.py | 43 +++--- 1 file changed, 6 insertions(+), 37 deletions(-)
(airflow) branch main updated (1e71096571 -> d7c49242c6)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 1e71096571 Fix test and provide proper dataset (#42259) add d7c49242c6 Add unit test to catch timeout errors. (#42269) No new revisions were added by this update. Summary of changes: tests/models/test_dagbag.py | 40 1 file changed, 40 insertions(+)
(airflow) branch main updated: Fix test and provide proper dataset (#42259)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 1e71096571 Fix test and provide proper dataset (#42259) 1e71096571 is described below commit 1e71096571afe528be76ea1e971f597a7fc83220 Author: olegkachur-e AuthorDate: Wed Sep 18 21:51:21 2024 +0200 Fix test and provide proper dataset (#42259) --- .../example_automl_vision_object_detection.py | 44 -- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py index 061b8efea6..2f92e81d9a 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py +++ b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py @@ -28,11 +28,6 @@ from google.cloud.aiplatform import schema from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( -GCSCreateBucketOperator, -GCSDeleteBucketOperator, -GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLImageTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -46,27 +41,28 @@ from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_automl_vision_obj_detect" +DAG_ID = "automl_vision_obj_detect" REGION = "us-central1" IMAGE_DISPLAY_NAME = f"automl-vision-detect-{ENV_ID}" MODEL_DISPLAY_NAME = f"automl-vision-detect-model-{ENV_ID}" - RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -IMAGE_GCS_BUCKET_NAME = f"bucket_image_detect_{ENV_ID}".replace("_", "-") IMAGE_DATASET = { "display_name": f"image-detect-dataset-{ENV_ID}", "metadata_schema_uri": schema.dataset.metadata.image, "metadata": Value(string_value="image-dataset"), } + IMAGE_DATA_CONFIG = [ +# For testing only { "import_schema_uri": schema.dataset.ioformat.image.bounding_box, -"gcs_source": {"uris": [f"gs://{IMAGE_GCS_BUCKET_NAME}/automl/object_detection.csv"]}, +"gcs_source": { +"uris": [f"gs://{RESOURCE_DATA_BUCKET}/automl/datasets/vision/obj_detection_short.csv"] +}, }, ] - # Example DAG for AutoML Vision Object Detection with DAG( DAG_ID, @@ -75,22 +71,6 @@ with DAG( catchup=False, tags=["example", "automl", "vision", "object-detection"], ) as dag: -create_bucket = GCSCreateBucketOperator( -task_id="create_bucket", -bucket_name=IMAGE_GCS_BUCKET_NAME, -storage_class="REGIONAL", -location=REGION, -) - -move_dataset_file = GCSSynchronizeBucketsOperator( -task_id="move_dataset_to_bucket", -source_bucket=RESOURCE_DATA_BUCKET, -source_object="automl/datasets/vision", -destination_bucket=IMAGE_GCS_BUCKET_NAME, -destination_object="automl", -recursive=True, -) - create_image_dataset = CreateDatasetOperator( task_id="image_dataset", dataset=IMAGE_DATASET, @@ -143,25 +123,15 @@ with DAG( trigger_rule=TriggerRule.ALL_DONE, ) -delete_bucket = GCSDeleteBucketOperator( -task_id="delete_bucket", -bucket_name=IMAGE_GCS_BUCKET_NAME, -trigger_rule=TriggerRule.ALL_DONE, -) - ( # TEST SETUP -[ -create_bucket >> move_dataset_file, -create_image_dataset, -] +create_image_dataset >> import_image_dataset # TEST BODY >> create_auto_ml_image_training_job # TEST TEARDOWN >> delete_auto_ml_image_training_job >> delete_image_dataset ->> delete_bucket ) from tests.system.utils.watcher import watcher
(airflow) branch main updated: Fix example_bigtable system test (#42250)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new e757345452 Fix example_bigtable system test (#42250) e757345452 is described below commit e757345452ea97fcdd307b44eed01d43c104f0b5 Author: olegkachur-e AuthorDate: Wed Sep 18 21:25:31 2024 +0200 Fix example_bigtable system test (#42250) --- .../google/cloud/bigtable/example_bigtable.py | 58 +- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/tests/system/providers/google/cloud/bigtable/example_bigtable.py b/tests/system/providers/google/cloud/bigtable/example_bigtable.py index e99cd62ae5..01f2faff80 100644 --- a/tests/system/providers/google/cloud/bigtable/example_bigtable.py +++ b/tests/system/providers/google/cloud/bigtable/example_bigtable.py @@ -67,20 +67,22 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID DAG_ID = "bigtable" - -CBT_INSTANCE_ID = f"bigtable-instance-id-{ENV_ID}" -CBT_INSTANCE_DISPLAY_NAME = "Instance-name" -CBT_INSTANCE_DISPLAY_NAME_UPDATED = f"{CBT_INSTANCE_DISPLAY_NAME} - updated" +# CBT instance id str full length should be between [6,33], lowercase only, "_" symbol is forbidden +CBT_INSTANCE_ID_BASE = f"inst-id-{str(ENV_ID)[:20]}".lower().replace("_", "-") +CBT_INSTANCE_ID_1 = f"{CBT_INSTANCE_ID_BASE}-1" +CBT_INSTANCE_ID_2 = f"{CBT_INSTANCE_ID_BASE}-2" +CBT_INSTANCE_DISPLAY_NAME = "instance-name" +CBT_INSTANCE_DISPLAY_NAME_UPDATED = f"{CBT_INSTANCE_DISPLAY_NAME}-updated" CBT_INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT -CBT_INSTANCE_TYPE_PROD = 1 +CBT_INSTANCE_TYPE_PROD = enums.Instance.Type.PRODUCTION CBT_INSTANCE_LABELS: dict[str, str] = {} CBT_INSTANCE_LABELS_UPDATED = {"env": "prod"} -CBT_CLUSTER_ID = f"bigtable-cluster-id-{ENV_ID}" +CBT_CLUSTER_ID = "bigtable-cluster-id" CBT_CLUSTER_ZONE = "europe-west1-b" CBT_CLUSTER_NODES = 3 CBT_CLUSTER_NODES_UPDATED = 5 CBT_CLUSTER_STORAGE_TYPE = enums.StorageType.HDD -CBT_TABLE_ID = f"bigtable-table-id{ENV_ID}" +CBT_TABLE_ID = "bigtable-table-id" CBT_POKE_INTERVAL = 60 @@ -94,7 +96,7 @@ with DAG( # [START howto_operator_gcp_bigtable_instance_create] create_instance_task = BigtableCreateInstanceOperator( project_id=PROJECT_ID, -instance_id=CBT_INSTANCE_ID, +instance_id=CBT_INSTANCE_ID_1, main_cluster_id=CBT_CLUSTER_ID, main_cluster_zone=CBT_CLUSTER_ZONE, instance_display_name=CBT_INSTANCE_DISPLAY_NAME, @@ -104,8 +106,10 @@ with DAG( cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type] task_id="create_instance_task", ) +# [END howto_operator_gcp_bigtable_instance_create] + create_instance_task2 = BigtableCreateInstanceOperator( -instance_id=CBT_INSTANCE_ID, +instance_id=CBT_INSTANCE_ID_2, main_cluster_id=CBT_CLUSTER_ID, main_cluster_zone=CBT_CLUSTER_ZONE, instance_display_name=CBT_INSTANCE_DISPLAY_NAME, @@ -115,23 +119,24 @@ with DAG( cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type] task_id="create_instance_task2", ) -# [END howto_operator_gcp_bigtable_instance_create] @task_group() def create_tables(): # [START howto_operator_gcp_bigtable_table_create] create_table_task = BigtableCreateTableOperator( project_id=PROJECT_ID, -instance_id=CBT_INSTANCE_ID, +instance_id=CBT_INSTANCE_ID_1, table_id=CBT_TABLE_ID, task_id="create_table", ) +# [END howto_operator_gcp_bigtable_table_create] + create_table_task2 = BigtableCreateTableOperator( -instance_id=CBT_INSTANCE_ID, +instance_id=CBT_INSTANCE_ID_2, table_id=CBT_TABLE_ID, task_id="create_table_task2", ) -# [END howto_operator_gcp_bigtable_table_create] + create_table_task >> create_table_task2 @task_group() @@ -139,22 +144,22 @@ with DAG( # [START howto_operator_gcp_bigtable_cluster_update] cluster_update_task = BigtableUpdateClusterOperator( project_id=PROJECT_ID, -instance_id=CBT_INSTANCE_ID, +instance_id=CBT_INSTANCE_ID_1, cluster_id=CBT_CLUSTER_ID, nodes=CBT_CLUSTER_NODES_UPDATED, task_id="update_cluster_task", ) +# [END howto_operator_gcp_bigtable_cluster_update] cluster_update_task2 = Bigtable
(airflow) branch main updated (39164a367d -> 46b41e332f)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 39164a367d Enable HPA for Airflow Webserver (#41955) add 46b41e332f Use `selectinload` in trigger (#40487) No new revisions were added by this update. Summary of changes: airflow/models/trigger.py | 6 +++--- docs/spelling_wordlist.txt | 1 + 2 files changed, 4 insertions(+), 3 deletions(-)
(airflow) branch main updated: In case a provider is not ready, also run lowest dependency tests (#42265)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 2ebd8b5e92 In case a provider is not ready, also run lowest dependency tests (#42265) 2ebd8b5e92 is described below commit 2ebd8b5e927ff6246838a3e556a3c4c8b1f89deb Author: Jens Scheffler <95105677+jsche...@users.noreply.github.com> AuthorDate: Wed Sep 18 16:20:02 2024 +0200 In case a provider is not ready, also run lowest dependency tests (#42265) --- dev/breeze/src/airflow_breeze/utils/selective_checks.py | 4 +++- dev/breeze/tests/test_selective_checks.py | 6 -- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py b/dev/breeze/src/airflow_breeze/utils/selective_checks.py index 653d71f9a4..3b3408d06c 100644 --- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py +++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py @@ -873,7 +873,9 @@ class SelectiveChecks: current_test_types = set(self._get_test_types_to_run(split_to_individual_providers=True)) if "Providers" in current_test_types: current_test_types.remove("Providers") -current_test_types.update({f"Providers[{provider}]" for provider in get_available_packages()}) +current_test_types.update( +{f"Providers[{provider}]" for provider in get_available_packages(include_not_ready=True)} +) if self.skip_provider_tests: current_test_types = { test_type for test_type in current_test_types if not test_type.startswith("Providers") diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index ac03f57ba3..6161c44f6e 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -43,7 +43,9 @@ ANSI_COLORS_MATCHER = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]") ALL_DOCS_SELECTED_FOR_BUILD = "" ALL_PROVIDERS_AFFECTED = "" -LIST_OF_ALL_PROVIDER_TESTS = " ".join(f"Providers[{provider}]" for provider in get_available_packages()) +LIST_OF_ALL_PROVIDER_TESTS = " ".join( +f"Providers[{provider}]" for provider in get_available_packages(include_not_ready=True) +) # commit that is neutral - allows to keep pyproject.toml-changing PRS neutral for unit tests @@ -1119,7 +1121,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", }, -id="Everything should run including full providers when" +id="Everything should run including full providers when " "full tests are needed even if no files are changed", ) ),
(airflow) branch main updated (20ea6b7598 -> ac9a9f5c19)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 20ea6b7598 Airflow Standard Provider (#41564) add ac9a9f5c19 Remove reference to 'scheduler_loop.jpg' in _run_scheduler_loop docstring (#42300) No new revisions were added by this update. Summary of changes: airflow/jobs/scheduler_job_runner.py | 5 - 1 file changed, 5 deletions(-)
(airflow) branch main updated (b599d81121 -> a660636baa)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from b599d81121 Implement `SimpleAuthManager` (#42004) add a660636baa Adding How to commit to Contributor Quickstart (#42181) No new revisions were added by this update. Summary of changes: contributing-docs/10_working_with_git.rst | 24 1 file changed, 20 insertions(+), 4 deletions(-)
(airflow) branch main updated: Implement `SimpleAuthManager` (#42004)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new b599d81121 Implement `SimpleAuthManager` (#42004) b599d81121 is described below commit b599d81121f3b9103658fba9c33d6f3bebe33cf6 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Tue Sep 17 09:24:25 2024 -0700 Implement `SimpleAuthManager` (#42004) --- airflow/auth/managers/simple/__init__.py | 17 ++ .../auth/managers/simple/simple_auth_manager.py| 238 ++ airflow/auth/managers/simple/user.py | 41 +++ airflow/auth/managers/simple/views/__init__.py | 17 ++ airflow/auth/managers/simple/views/auth.py | 88 +++ .../config_templates/default_webserver_config.py | 17 ++ airflow/www/app.py | 4 - airflow/www/extensions/init_auth_manager.py| 1 + airflow/www/static/js/login/Form.tsx | 44 airflow/www/static/js/login/index.test.tsx | 42 airflow/www/static/js/login/index.tsx | 73 ++ airflow/www/templates/airflow/login.html | 45 airflow/www/templates/appbuilder/navbar.html | 2 + airflow/www/webpack.config.js | 1 + tests/auth/managers/simple/__init__.py | 16 ++ .../managers/simple/test_simple_auth_manager.py| 274 + tests/auth/managers/simple/test_user.py| 37 +++ tests/auth/managers/simple/views/__init__.py | 16 ++ tests/auth/managers/simple/views/test_auth.py | 76 ++ 19 files changed, 1045 insertions(+), 4 deletions(-) diff --git a/airflow/auth/managers/simple/__init__.py b/airflow/auth/managers/simple/__init__.py new file mode 100644 index 00..217e5db960 --- /dev/null +++ b/airflow/auth/managers/simple/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/auth/managers/simple/simple_auth_manager.py b/airflow/auth/managers/simple/simple_auth_manager.py new file mode 100644 index 00..1d73341719 --- /dev/null +++ b/airflow/auth/managers/simple/simple_auth_manager.py @@ -0,0 +1,238 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import os +import random +from collections import namedtuple +from enum import Enum +from typing import TYPE_CHECKING + +from flask import session, url_for +from termcolor import colored + +from airflow.auth.managers.base_auth_manager import BaseAuthManager, ResourceMethod +from airflow.auth.managers.simple.views.auth import SimpleAuthManagerAuthenticationViews +from hatch_build import AIRFLOW_ROOT_PATH + +if TYPE_CHECKING: +from airflow.auth.managers.models.base_user import BaseUser +from airflow.auth.managers.models.resource_details import ( +AccessView, +ConfigurationDetails, +ConnectionDetails, +DagAccessEntity, +DagDetails, +DatasetDetails, +PoolDetails, +VariableDetails, +) +from airflow.auth.managers.simple.user import SimpleAuthManagerUser + + +class SimpleAuthManagerRole(namedtuple("SimpleAuthManagerRole", "name order"), Enum): +""" +List of pre-defined roles in
(airflow) branch main updated: Deprecated database configuration removed (#42126)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 7dea23fd23 Deprecated database configuration removed (#42126) 7dea23fd23 is described below commit 7dea23fd23fa347143fded6979879cf5f58fa132 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Tue Sep 17 20:07:25 2024 +0530 Deprecated database configuration removed (#42126) --- airflow/configuration.py | 15 +-- docs/apache-airflow/faq.rst | 2 +- newsfragments/42126.significant.rst | 14 ++ tests/api_connexion/endpoints/test_config_endpoint.py | 13 - tests/core/test_configuration.py | 4 +++- 5 files changed, 27 insertions(+), 21 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 9da7a99fc1..f50e192683 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -325,20 +325,7 @@ class AirflowConfigParser(ConfigParser): # A mapping of (new section, new option) -> (old section, old option, since_version). # When reading new option, the old option will be checked to see if it exists. If it does a # DeprecationWarning will be issued and the old option will be used instead -deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { -("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"), -("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"), -("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"), -("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"), -("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"), -("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"), -("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"), -("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"), -("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"), -("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"), -("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"), -("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"), -} +deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {} # A mapping of new section -> (old section, since_version). deprecated_sections: dict[str, tuple[str, str]] = {} diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 791cfc39a0..0b2c76765e 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -494,7 +494,7 @@ What does "MySQL Server has gone away" mean? You may occasionally experience ``OperationalError`` with the message "MySQL Server has gone away". This is due to the connection pool keeping connections open too long and you are given an old connection that has expired. To ensure a -valid connection, you can set :ref:`config:core__sql_alchemy_pool_recycle` to ensure connections are invalidated after +valid connection, you can set :ref:`config:database__sql_alchemy_pool_recycle` to ensure connections are invalidated after that many seconds and new ones are created. diff --git a/newsfragments/42126.significant.rst b/newsfragments/42126.significant.rst new file mode 100644 index 00..b7fe76179b --- /dev/null +++ b/newsfragments/42126.significant.rst @@ -0,0 +1,14 @@ +Removed deprecated database configuration. + + * Removed deprecated configuration ``sql_alchemy_conn`` from ``core``. Please use ``sql_alchemy_conn`` from ``database`` instead. + * Removed deprecated configuration ``sql_engine_encoding`` from ``core``. Please use ``sql_engine_encoding`` from ``database`` instead. + * Removed deprecated configuration ``sql_engine_collation_for_ids`` from ``core``. Please use ``sql_engine_collation_for_ids`` from ``database`` instead. + * Removed deprecated con
(airflow) branch v2-10-test updated: Fix documentation for cpu and memory usage (#42147) (#42256)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch v2-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/v2-10-test by this push: new 583041b5a4 Fix documentation for cpu and memory usage (#42147) (#42256) 583041b5a4 is described below commit 583041b5a43f1a3dbf70958eec2ad76493ec7b01 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Sep 16 09:11:14 2024 -0700 Fix documentation for cpu and memory usage (#42147) (#42256) These metrics are reported in airflow/task/task_runner/standard_task_runner.py without "percent" (cherry picked from commit 3f6497b528a9ca50c363f80b74025046aaec4215) Co-authored-by: lucasmo --- .../administration-and-deployment/logging-monitoring/metrics.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index f3e69a9541..c8522bee3b 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -246,8 +246,8 @@ Name Description ``pool.scheduled_slots`` Number of scheduled slots in the pool. Metric with pool_name tagging. ``pool.starving_tasks.`` Number of starving tasks in the pool ``pool.starving_tasks`` Number of starving tasks in the pool. Metric with pool_name tagging. -``task.cpu_usage_percent..``Percentage of CPU used by a task -``task.mem_usage_percent..``Percentage of memory used by a task +``task.cpu_usage..``Percentage of CPU used by a task +``task.mem_usage..``Percentage of memory used by a task ``triggers.running.`` Number of triggers currently running for a triggerer (described by hostname) ``triggers.running`` Number of triggers currently running for a triggerer (described by hostname). Metric with hostname tagging.
(airflow) branch main updated: Deprecated configuration removed (#42129)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new d1e500c450 Deprecated configuration removed (#42129) d1e500c450 is described below commit d1e500c45069dc42254d55d8175e2c494cb41167 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Mon Sep 16 20:50:34 2024 +0530 Deprecated configuration removed (#42129) --- airflow/configuration.py | 20 +--- .../cncf/kubernetes/executors/kubernetes_executor.py | 4 +++- .../providers/fab/auth_manager/fab_auth_manager.py | 12 +--- newsfragments/42129.significant.rst | 17 + tests/core/test_configuration.py | 15 +++ .../kubernetes/executors/test_kubernetes_executor.py | 14 +++--- tests/providers/cncf/kubernetes/test_client.py | 4 +++- 7 files changed, 47 insertions(+), 39 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 4238d59054..9da7a99fc1 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -326,15 +326,6 @@ class AirflowConfigParser(ConfigParser): # When reading new option, the old option will be checked to see if it exists. If it does a # DeprecationWarning will be issued and the old option will be used instead deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { -("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"), -("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"), -("operators", "default_queue"): ("celery", "default_queue", "2.1.0"), -("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"), -("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"), -("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"), -("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"), -("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"), -("api", "auth_backends"): ("api", "auth_backend", "2.3.0"), ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"), ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"), ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"), @@ -347,19 +338,10 @@ class AirflowConfigParser(ConfigParser): ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"), ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"), ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"), -("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"), -("scheduler", "task_queued_timeout_check_interval"): ( -"kubernetes_executor", -"worker_pods_pending_timeout_check_interval", -"2.6.0", -), -("fab", "update_fab_perms"): ("webserver", "update_fab_perms", "2.9.0"), -("fab", "auth_rate_limited"): ("webserver", "auth_rate_limited", "2.9.0"), -("fab", "auth_rate_limit"): ("webserver", "auth_rate_limit", "2.9.0"), } # A mapping of new section -> (old section, since_version). -deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")} +deprecated_sections: dict[str, tuple[str, str]] = {} # Now build the inverse so we can go from old_section/old_key to new_section/new_key # if someone tries to retrieve it based on old_section/old_key di
(airflow) branch main updated (9a1f9e90a5 -> a094f9105c)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 9a1f9e90a5 Fix require_confirmation_dag_change (#42063) add a094f9105c Move `is_active` user property to FAB auth manager (#42042) No new revisions were added by this update. Summary of changes: airflow/auth/managers/models/base_user.py | 4 airflow/providers/fab/auth_manager/fab_auth_manager.py| 3 ++- airflow/www/app.py| 2 -- airflow/www/extensions/init_security.py | 13 - newsfragments/42042.significant.rst | 1 + tests/providers/fab/auth_manager/test_fab_auth_manager.py | 9 + tests/test_utils/decorators.py| 1 - tests/www/views/conftest.py | 1 - tests/www/views/test_views_rate_limit.py | 1 - 9 files changed, 12 insertions(+), 23 deletions(-) create mode 100644 newsfragments/42042.significant.rst
(airflow) branch main updated: docs: Remove outdated 'executor' reference from run() method docstring (#42121)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 97d3009115 docs: Remove outdated 'executor' reference from run() method docstring (#42121) 97d3009115 is described below commit 97d3009115edc2e6a03eca8f81a4fa3a23b72b37 Author: Wonseok Yang AuthorDate: Wed Sep 11 02:27:53 2024 +0900 docs: Remove outdated 'executor' reference from run() method docstring (#42121) --- airflow/models/dag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 56f7dc89d2..95a2f8b6e3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2321,7 +2321,6 @@ class DAG(LoggingMixin): :param end_date: the end date of the range to run :param mark_success: True to mark jobs as succeeded without running them :param local: True to run the tasks using the LocalExecutor -:param executor: The executor instance to run the tasks :param donot_pickle: True to avoid pickling DAG object and send to workers :param ignore_task_deps: True to skip upstream tasks :param ignore_first_depends_on_past: True to ignore depends_on_past
(airflow) branch main updated: Fix simple typo in the documentation. (#42058)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new ccc897afc1 Fix simple typo in the documentation. (#42058) ccc897afc1 is described below commit ccc897afc16abcd504781e46edf1810eabc501ae Author: Hyunsoo Kang AuthorDate: Wed Sep 11 02:15:46 2024 +0900 Fix simple typo in the documentation. (#42058) --- docs/apache-airflow/howto/custom-operator.rst | 44 +-- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/apache-airflow/howto/custom-operator.rst b/docs/apache-airflow/howto/custom-operator.rst index ce32654a6b..f2b8db712d 100644 --- a/docs/apache-airflow/howto/custom-operator.rst +++ b/docs/apache-airflow/howto/custom-operator.rst @@ -281,44 +281,44 @@ templated field: .. code-block:: python class HelloOperator(BaseOperator): -template_fields = "field_a" +template_fields = "foo" -def __init__(field_a_id) -> None: # <- should be def __init__(field_a)-> None -self.field_a = field_a_id # <- should be self.field_a = field_a +def __init__(self, foo_id) -> None: # should be def __init__(self, foo) -> None +self.foo = foo_id # should be self.foo = foo 2. Templated fields' instance members must be assigned with their corresponding parameter from the constructor, either by a direct assignment or by calling the parent's constructor (in which these fields are defined as ``template_fields``) with explicit an assignment of the parameter. -The following example is invalid, as the instance member ``self.field_a`` is not assigned at all, despite being a +The following example is invalid, as the instance member ``self.foo`` is not assigned at all, despite being a templated field: .. code-block:: python class HelloOperator(BaseOperator): -template_fields = ("field_a", "field_b") +template_fields = ("foo", "bar") -def __init__(field_a, field_b) -> None: -self.field_b = field_b +def __init__(self, foo, bar) -> None: +self.bar = bar -The following example is also invalid, as the instance member ``self.field_a`` of ``MyHelloOperator`` is initialized +The following example is also invalid, as the instance member ``self.foo`` of ``MyHelloOperator`` is initialized implicitly as part of the ``kwargs`` passed to its parent constructor: .. code-block:: python class HelloOperator(BaseOperator): -template_fields = "field_a" +template_fields = "foo" -def __init__(field_a) -> None: -self.field_a = field_a +def __init__(self, foo) -> None: +self.foo = foo class MyHelloOperator(HelloOperator): -template_fields = ("field_a", "field_b") +template_fields = ("foo", "bar") -def __init__(field_b, **kwargs) -> None: # <- should be def __init__(field_a, field_b, **kwargs) -super().__init__(**kwargs) # <- should be super().__init__(field_a=field_a, **kwargs) -self.field_b = field_b +def __init__(self, bar, **kwargs) -> None: # should be def __init__(self, foo, bar, **kwargs) +super().__init__(**kwargs) # should be super().__init__(foo=foo, **kwargs) +self.bar = bar 3. Applying actions on the parameter during the assignment in the constructor is not allowed. Any action on the value should be applied in the ``execute()`` method. @@ -327,10 +327,10 @@ Therefore, the following example is invalid: .. code-block:: python class HelloOperator(BaseOperator): -template_fields = "field_a" +template_fields = "foo" -def __init__(field_a) -> None: -self.field_a = field_a.lower() # <- assignment should be only self.field_a = field_a +def __init__(self, foo) -> None: +self.foo = foo.lower() # assignment should be only self.foo = foo When an operator inherits from a base operator and does not have a constructor defined on its own, the limitations above do not apply. However, the templated fields must be set properly in the parent according to those limitations. @@ -340,14 +340,14 @@ Thus, the following example is valid: .. code-block:: python class HelloOperator(BaseOperator): -template_fields = "field_a" +template_fields = "foo" -def __init__(field_a) -> None: -self.field_a = field_a +
(airflow) branch main updated (0a81eb08d57 -> ff718bd7e3a)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 0a81eb08d57 Fix failing compatibility test: importing missing saml for old airflows (#42113) add ff718bd7e3a Aws executor docs update (#42092) No new revisions were added by this update. Summary of changes: docs/apache-airflow-providers-amazon/executors/general.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
(airflow) branch main updated: Fix failing compatibility test: importing missing saml for old airflows (#42113)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 0a81eb08d57 Fix failing compatibility test: importing missing saml for old airflows (#42113) 0a81eb08d57 is described below commit 0a81eb08d57215b7354231b54558173b821497ea Author: Jarek Potiuk AuthorDate: Mon Sep 9 13:15:30 2024 -0700 Fix failing compatibility test: importing missing saml for old airflows (#42113) The Compatibility tests for AWS are failing after recent changes as they attempt to import saml library before skipping the tests when the import is missing. --- pyproject.toml | 1 + tests/system/providers/amazon/aws/tests/test_aws_auth_manager.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3c62b110608..7c466743a93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -387,6 +387,7 @@ combine-as-imports = true "airflow/security/kerberos.py" = ["E402"] "airflow/security/utils.py" = ["E402"] "tests/providers/amazon/aws/auth_manager/security_manager/test_aws_security_manager_override.py" = ["E402"] +"tests/system/providers/amazon/aws/tests/test_aws_auth_manager.py" = ["E402"] "tests/providers/common/io/xcom/test_backend.py" = ["E402"] "tests/providers/elasticsearch/log/elasticmock/__init__.py" = ["E402"] "tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py" = ["E402"] diff --git a/tests/system/providers/amazon/aws/tests/test_aws_auth_manager.py b/tests/system/providers/amazon/aws/tests/test_aws_auth_manager.py index 44c0bcecc3b..792df7b155d 100644 --- a/tests/system/providers/amazon/aws/tests/test_aws_auth_manager.py +++ b/tests/system/providers/amazon/aws/tests/test_aws_auth_manager.py @@ -22,13 +22,13 @@ from unittest.mock import Mock, patch import boto3 import pytest +pytest.importorskip("onelogin") + from airflow.www import app as application from tests.system.providers.amazon.aws.utils import set_env_id from tests.test_utils.config import conf_vars from tests.test_utils.www import check_content_in_response -pytest.importorskip("onelogin") - SAML_METADATA_URL = "/saml/metadata" SAML_METADATA_PARSED = { "idp": {
(airflow) branch main updated: Rename `--extras` to `--airflow-extras` from `breeze prod-image build` command (#42077)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 1da4b146e9 Rename `--extras` to `--airflow-extras` from `breeze prod-image build` command (#42077) 1da4b146e9 is described below commit 1da4b146e954f78280dbe7bbbef452d58c8f728c Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Fri Sep 6 17:24:18 2024 -0400 Rename `--extras` to `--airflow-extras` from `breeze prod-image build` command (#42077) --- dev/breeze/doc/ci/02_images.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/breeze/doc/ci/02_images.md b/dev/breeze/doc/ci/02_images.md index 19c58ebc2d..6dfa8f350f 100644 --- a/dev/breeze/doc/ci/02_images.md +++ b/dev/breeze/doc/ci/02_images.md @@ -126,14 +126,14 @@ By adding `--python ` parameter you can build the image version for the chosen Python version. The images are built with default extras - different extras for CI and -production image and you can change the extras via the `--extras` +production image and you can change the extras via the `--airflow-extras` parameters and add new ones with `--additional-airflow-extras`. For example if you want to build Python 3.8 version of production image with "all" extras installed you should run this command: ``` bash -breeze prod-image build --python 3.8 --extras "all" +breeze prod-image build --python 3.8 --airflow-extras "all" ``` If you just want to add new extras you can add them like that:
(airflow) branch main updated: Use base aws classes in AWS Glue DataBrew Operators/Triggers (#41848)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 12bb8b3524 Use base aws classes in AWS Glue DataBrew Operators/Triggers (#41848) 12bb8b3524 is described below commit 12bb8b35241f0915e82a322c7905c8602df95a7f Author: GPK AuthorDate: Thu Sep 5 15:17:03 2024 +0100 Use base aws classes in AWS Glue DataBrew Operators/Triggers (#41848) --- airflow/providers/amazon/CHANGELOG.rst | 2 +- .../amazon/aws/operators/glue_databrew.py | 70 -- .../providers/amazon/aws/triggers/glue_databrew.py | 43 ++--- .../operators/glue_databrew.rst| 5 ++ .../amazon/aws/operators/test_glue_databrew.py | 44 ++ .../amazon/aws/triggers/test_glue_databrew.py | 22 +++ .../providers/amazon/aws/example_glue_databrew.py | 2 +- 7 files changed, 161 insertions(+), 27 deletions(-) diff --git a/airflow/providers/amazon/CHANGELOG.rst b/airflow/providers/amazon/CHANGELOG.rst index 32ac7cce53..cdb23e2485 100644 --- a/airflow/providers/amazon/CHANGELOG.rst +++ b/airflow/providers/amazon/CHANGELOG.rst @@ -34,7 +34,7 @@ Changelog `Apache Airflow providers support policy <https://github.com/apache/airflow/blob/main/PROVIDERS.rst#minimum-supported-version-of-airflow-for-community-managed-providers>`_. .. warning:: When deferrable mode was introduced for ``RedshiftDataOperator``, in version 8.17.0, tasks configured with - ``deferrable=True`` and ``wait_for_completion=True`` wouldn't enter the deferred state. Instead, the task would occupy + ``deferrable=True`` and ``wait_for_completion=True`` would not enter the deferred state. Instead, the task would occupy an executor slot until the statement was completed. A workaround may have been to set ``wait_for_completion=False``. In this version, tasks set up with ``wait_for_completion=False`` will not wait anymore, regardless of the value of ``deferrable``. diff --git a/airflow/providers/amazon/aws/operators/glue_databrew.py b/airflow/providers/amazon/aws/operators/glue_databrew.py index 8ea24d49ff..e4d774ae40 100644 --- a/airflow/providers/amazon/aws/operators/glue_databrew.py +++ b/airflow/providers/amazon/aws/operators/glue_databrew.py @@ -17,20 +17,22 @@ # under the License. from __future__ import annotations -from functools import cached_property +import warnings from typing import TYPE_CHECKING, Any, Sequence from airflow.configuration import conf -from airflow.models import BaseOperator +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator from airflow.providers.amazon.aws.triggers.glue_databrew import GlueDataBrewJobCompleteTrigger from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields if TYPE_CHECKING: from airflow.utils.context import Context -class GlueDataBrewStartJobOperator(BaseOperator): +class GlueDataBrewStartJobOperator(AwsBaseOperator[GlueDataBrewHook]): """ Start an AWS Glue DataBrew job. @@ -47,36 +49,55 @@ class GlueDataBrewStartJobOperator(BaseOperator): :param deferrable: If True, the operator will wait asynchronously for the job to complete. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) -:param delay: Time in seconds to wait between status checks. Default is 30. +:param delay: Time in seconds to wait between status checks. (Deprecated). +:param waiter_delay: Time in seconds to wait between status checks. Default is 30. +:param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 60) :return: dictionary with key run_id and value of the resulting job's run_id. + +:param aws_conn_id: The Airflow connection used for AWS credentials. +If this is ``None`` or empty then the default boto3 behaviour is used. If +running Airflow in a distributed manner and aws_conn_id is None or +empty, then default boto3 configuration would be used (and must be +maintained on each worker node). +:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. +:param verify: Whether or not to verify SSL certificates. See: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html +:param botocore_config: Configuration dictionary (key-values) for botocore client. See: + https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html "&qu
(airflow) branch main updated: Provider fab auth manager deprecated methods removed (#41720)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new b0391838c1 Provider fab auth manager deprecated methods removed (#41720) b0391838c1 is described below commit b0391838c142bebdf178ba030c45db16b1f1f33b Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Mon Aug 26 20:21:48 2024 +0530 Provider fab auth manager deprecated methods removed (#41720) --- .../fab/auth_manager/security_manager/override.py | 101 + newsfragments/41720.significant.rst| 5 + tests/providers/fab/auth_manager/test_security.py | 30 -- 3 files changed, 8 insertions(+), 128 deletions(-) diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py b/airflow/providers/fab/auth_manager/security_manager/override.py index 86d76de76f..85b78ae879 100644 --- a/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/airflow/providers/fab/auth_manager/security_manager/override.py @@ -23,8 +23,7 @@ import logging import os import random import uuid -import warnings -from typing import TYPE_CHECKING, Any, Callable, Collection, Container, Iterable, Sequence +from typing import Any, Callable, Collection, Iterable, Sequence import jwt import packaging.version @@ -69,13 +68,12 @@ from itsdangerous import want_bytes from markupsafe import Markup from sqlalchemy import and_, func, inspect, literal, or_, select from sqlalchemy.exc import MultipleResultsFound -from sqlalchemy.orm import Session, joinedload +from sqlalchemy.orm import joinedload from werkzeug.security import check_password_hash, generate_password_hash from airflow import __version__ as airflow_version -from airflow.auth.managers.utils.fab import get_method_from_fab_action_map from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, RemovedInAirflow3Warning +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import DagBag, DagModel from airflow.providers.fab.auth_manager.models import ( Action, @@ -108,14 +106,10 @@ from airflow.providers.fab.auth_manager.views.user_edit import ( ) from airflow.providers.fab.auth_manager.views.user_stats import CustomUserStatsChartView from airflow.security import permissions -from airflow.utils.session import NEW_SESSION, provide_session from airflow.www.extensions.init_auth_manager import get_auth_manager from airflow.www.security_manager import AirflowSecurityManagerV2 from airflow.www.session import AirflowDatabaseSessionInterface -if TYPE_CHECKING: -from airflow.auth.managers.base_auth_manager import ResourceMethod - log = logging.getLogger(__name__) # This is the limit of DB user sessions that we consider as "healthy". If you have more sessions that this @@ -962,70 +956,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): log.exception(const.LOGMSG_ERR_SEC_CREATE_DB) exit(1) -def get_readable_dags(self, user) -> Iterable[DagModel]: -"""Get the DAGs readable by authenticated user.""" -warnings.warn( -"`get_readable_dags` has been deprecated. Please use `get_auth_manager().get_permitted_dag_ids` " -"instead.", -RemovedInAirflow3Warning, -stacklevel=2, -) -with warnings.catch_warnings(): -warnings.simplefilter("ignore", RemovedInAirflow3Warning) -return self.get_accessible_dags([permissions.ACTION_CAN_READ], user) - -def get_editable_dags(self, user) -> Iterable[DagModel]: -"""Get the DAGs editable by authenticated user.""" -warnings.warn( -"`get_editable_dags` has been deprecated. Please use `get_auth_manager().get_permitted_dag_ids` " -"instead.", -RemovedInAirflow3Warning, -stacklevel=2, -) -with warnings.catch_warnings(): -warnings.simplefilter("ignore", RemovedInAirflow3Warning) -return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user) - -@provide_session -def get_accessible_dags( -self, -user_actions: Container[str] | None, -user, -session: Session = NEW_SESSION, -) -> Iterable[DagModel]: -warnings.warn( -"`get_accessible_dags` has been deprecated. Please use " -"`get_auth_manager().get_permitted_dag_ids` instead.", -RemovedInAirflow3Warning, -stacklevel=3, -) - -dag_ids = self.get_accessible_dag_ids(user, user_actions, s
(airflow) branch main updated (d6f820d5bc -> 761ad6ec13)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from d6f820d5bc Bump micromatch from 4.0.5 to 4.0.8 in /airflow/www (#41726) add 761ad6ec13 Remove Airflow 2.7 support from Kubernetes Provider (#41746) No new revisions were added by this update. Summary of changes: .../kubernetes/executors/kubernetes_executor_utils.py | 18 +- 1 file changed, 5 insertions(+), 13 deletions(-)
(airflow) branch main updated: Handle executor events in `dag.test()` (#41625)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new aa9da93a8c Handle executor events in `dag.test()` (#41625) aa9da93a8c is described below commit aa9da93a8cd150dd386af7d3524d79c7c201bdc9 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Aug 22 09:33:28 2024 -0400 Handle executor events in `dag.test()` (#41625) --- airflow/jobs/scheduler_job_runner.py | 37 +++- airflow/models/dag.py| 10 ++ 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index baf91dc40d..b662c3215a 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -739,9 +739,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): session.bulk_save_objects(objects=objects, preserve_order=False) def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: -"""Respond to executor events.""" if not self._standalone_dag_processor and not self.processor_agent: raise ValueError("Processor agent is not started.") + +return SchedulerJobRunner.process_executor_events( +executor=executor, dag_bag=self.dagbag, job_id=self.job.id, session=session +) + +@classmethod +def process_executor_events( +cls, executor: BaseExecutor, dag_bag: DagBag, job_id: str | None, session: Session +) -> int: +""" +Respond to executor events. + +This is a classmethod because this is also used in `dag.test()`. +`dag.test` execute DAGs with no scheduler, therefore it needs to handle the events pushed by the +executors as well. +""" ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] = {} event_buffer = executor.get_event_buffer() tis_with_right_state: list[TaskInstanceKey] = [] @@ -751,7 +766,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): # We create map (dag_id, task_id, execution_date) -> in-memory try_number ti_primary_key_to_try_number_map[ti_key.primary] = ti_key.try_number -self.log.info("Received executor event with state %s for task instance %s", state, ti_key) +cls.logger().info("Received executor event with state %s for task instance %s", state, ti_key) if state in ( TaskInstanceState.FAILED, TaskInstanceState.SUCCESS, @@ -778,7 +793,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING): ti.external_executor_id = info -self.log.info("Setting external_id for %s to %s", ti, info) +cls.logger().info("Setting external_id for %s to %s", ti, info) continue msg = ( @@ -788,7 +803,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): "job_id=%s, pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, " "queued_by_job_id=%s, pid=%s" ) -self.log.info( +cls.logger().info( msg, ti.dag_id, ti.task_id, @@ -876,9 +891,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): # All of this could also happen if the state is "running", # but that is handled by the zombie detection. -ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED +ti_queued = ti.try_number == buffer_key.try_number and ti.state in ( +TaskInstanceState.SCHEDULED, +TaskInstanceState.QUEUED, +TaskInstanceState.RUNNING, +) ti_requeued = ( -ti.queued_by_job_id != self.job.id # Another scheduler has queued this task again +ti.queued_by_job_id != job_id # Another scheduler has queued this task again or executor.has_task(ti) # This scheduler has this task already ) @@ -894,15 +913,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): ) if info is not None: msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt -self.log.error(msg) +cls.logger().error(msg) session.add(Log(event="state mismatch", extra=msg, task_instance=ti.key))
(airflow) branch main updated: secrets backend deprecated methods removed (#41642)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 73eb9bf5ab secrets backend deprecated methods removed (#41642) 73eb9bf5ab is described below commit 73eb9bf5ab1544c4fd26de22725a03cb5a305185 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Thu Aug 22 19:02:17 2024 +0530 secrets backend deprecated methods removed (#41642) --- airflow/secrets/base_secrets.py | 55 +--- airflow/secrets/environment_variables.py | 18 --- airflow/secrets/metastore.py | 15 - newsfragments/41642.significant.rst | 3 ++ 4 files changed, 4 insertions(+), 87 deletions(-) diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py index 3346d880f2..329eb95cb3 100644 --- a/airflow/secrets/base_secrets.py +++ b/airflow/secrets/base_secrets.py @@ -16,12 +16,9 @@ # under the License. from __future__ import annotations -import warnings from abc import ABC from typing import TYPE_CHECKING -from airflow.exceptions import RemovedInAirflow3Warning - if TYPE_CHECKING: from airflow.models.connection import Connection @@ -69,17 +66,6 @@ class BaseSecretsBackend(ABC): else: return Connection(conn_id=conn_id, uri=value) -def get_conn_uri(self, conn_id: str) -> str | None: -""" -Get conn_uri from Secrets Backend. - -This method is deprecated and will be removed in a future release; implement ``get_conn_value`` -instead. - -:param conn_id: connection id -""" -raise NotImplementedError() - def get_connection(self, conn_id: str) -> Connection | None: """ Return connection object with a given ``conn_id``. @@ -88,52 +74,13 @@ class BaseSecretsBackend(ABC): :param conn_id: connection id """ -value = None - -not_implemented_get_conn_value = False -# TODO: after removal of ``get_conn_uri`` we should not catch NotImplementedError here -try: -value = self.get_conn_value(conn_id=conn_id) -except NotImplementedError: -not_implemented_get_conn_value = True -warnings.warn( -"Method `get_conn_uri` is deprecated. Please use `get_conn_value`.", -RemovedInAirflow3Warning, -stacklevel=2, -) - -if not_implemented_get_conn_value: -try: -value = self.get_conn_uri(conn_id=conn_id) -except NotImplementedError: -raise NotImplementedError( -f"Secrets backend {self.__class__.__name__} neither implements " -"`get_conn_value` nor `get_conn_uri`. Method `get_conn_uri` is " -"deprecated and will be removed in a future release. Please implement `get_conn_value`." -) +value = self.get_conn_value(conn_id=conn_id) if value: return self.deserialize_connection(conn_id=conn_id, value=value) else: return None -def get_connections(self, conn_id: str) -> list[Connection]: -""" -Return connection object with a given ``conn_id``. - -:param conn_id: connection id -""" -warnings.warn( -"This method is deprecated. Please use " - "`airflow.secrets.base_secrets.BaseSecretsBackend.get_connection`.", -RemovedInAirflow3Warning, -stacklevel=2, -) -conn = self.get_connection(conn_id=conn_id) -if conn: -return [conn] -return [] - def get_variable(self, key: str) -> str | None: """ Return value for Airflow Variable. diff --git a/airflow/secrets/environment_variables.py b/airflow/secrets/environment_variables.py index 69d3174b72..e6bd72d4e5 100644 --- a/airflow/secrets/environment_variables.py +++ b/airflow/secrets/environment_variables.py @@ -20,9 +20,7 @@ from __future__ import annotations import os -import warnings -from airflow.exceptions import RemovedInAirflow3Warning from airflow.secrets import BaseSecretsBackend CONN_ENV_PREFIX = "AIRFLOW_CONN_" @@ -32,22 +30,6 @@ VAR_ENV_PREFIX = "AIRFLOW_VAR_" class EnvironmentVariablesBackend(BaseSecretsBackend): """Retrieves Connection object and Variable from environment variable.""" -def get_conn_uri(self, conn_id: str) -> str | None: -""" -Return URI representation of Connection conn_id. - -:param conn_id
(airflow) branch main updated (4eb71d60ca -> 7e2c48cae5)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 4eb71d60ca add kerberos env to base container env, add webserver-config volume) (#41645) add 7e2c48cae5 Update `example_emr_eks` system test to run `eksctl` in one task (#41654) No new revisions were added by this update. Summary of changes: .../system/providers/amazon/aws/example_emr_eks.py | 29 -- 1 file changed, 5 insertions(+), 24 deletions(-)
(airflow) branch main updated: fix: select_query should have precedence over default query in RedshiftToS3Operator (#41634)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 410b57795b fix: select_query should have precedence over default query in RedshiftToS3Operator (#41634) 410b57795b is described below commit 410b57795b37f3e22e9920499feec22709f49427 Author: Kacper Muda AuthorDate: Wed Aug 21 16:18:06 2024 +0200 fix: select_query should have precedence over default query in RedshiftToS3Operator (#41634) --- .../amazon/aws/transfers/redshift_to_s3.py | 20 ++-- .../amazon/aws/transfers/test_redshift_to_s3.py| 117 + 2 files changed, 130 insertions(+), 7 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index 73578ea539..ef3cebdae9 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -44,11 +44,12 @@ class RedshiftToS3Operator(BaseOperator): :param s3_bucket: reference to a specific S3 bucket :param s3_key: reference to a specific S3 key. If ``table_as_file_name`` is set to False, this param must include the desired file name -:param schema: reference to a specific schema in redshift database -Applicable when ``table`` param provided. -:param table: reference to a specific table in redshift database -Used when ``select_query`` param not provided. -:param select_query: custom select query to fetch data from redshift database +:param schema: reference to a specific schema in redshift database, +used when ``table`` param provided and ``select_query`` param not provided +:param table: reference to a specific table in redshift database, +used when ``schema`` param provided and ``select_query`` param not provided +:param select_query: custom select query to fetch data from redshift database, +has precedence over default query `SELECT * FROM ``schema``.``table`` :param redshift_conn_id: reference to a specific redshift database :param aws_conn_id: reference to a specific S3 connection If the AWS connection contains 'aws_iam_role' in ``extras`` @@ -138,12 +139,17 @@ class RedshiftToS3Operator(BaseOperator): {unload_options}; """ +@property +def default_select_query(self) -> str | None: +if self.schema and self.table: +return f"SELECT * FROM {self.schema}.{self.table}" +return None + def execute(self, context: Context) -> None: if self.table and self.table_as_file_name: self.s3_key = f"{self.s3_key}/{self.table}_" -if self.schema and self.table: -self.select_query = f"SELECT * FROM {self.schema}.{self.table}" +self.select_query = self.select_query or self.default_select_query if self.select_query is None: raise ValueError( diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py index d2af90a445..2d28acd22e 100644 --- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py @@ -305,6 +305,123 @@ class TestRedshiftToS3Transfer: assert_equal_ignore_multiple_spaces(mock_run.call_args.args[0], unload_query) assert f"UNLOAD ($${expected_query}$$)" in unload_query +@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") +@mock.patch("airflow.models.connection.Connection") +@mock.patch("boto3.session.Session") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run") +def test_custom_select_query_has_precedence_over_table_and_schema( +self, +mock_run, +mock_session, +mock_connection, +mock_hook, +): +access_key = "aws_access_key_id" +secret_key = "aws_secret_access_key" +mock_session.return_value = Session(access_key, secret_key) +mock_session.return_value.access_key = access_key +mock_session.return_value.secret_key = secret_key +mock_session.return_value.token = None +mock_connection.return_value = Connection() +mock_hook.return_value = Connection() +s3_bucket = "bucket" +s3_key = "key" +unload_options = [ +"HEADER", +] +select_query = "select column from table" + +op = RedshiftToS3Operator( +select_query=select_query, +table="table", +schema="schema&quo
(airflow) branch main updated: Wait sensor async (#41557)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 50e2ecb2b8 Wait sensor async (#41557) 50e2ecb2b8 is described below commit 50e2ecb2b8693e558572cb5f073aff9d8a7a6570 Author: Collin McNulty AuthorDate: Tue Aug 20 14:50:54 2024 -0500 Wait sensor async (#41557) --- airflow/sensors/time_delta.py| 39 ++- tests/sensors/test_time_delta.py | 23 +-- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index d068fad9bf..dc78a0e33b 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,11 +17,14 @@ # under the License. from __future__ import annotations +from datetime import timedelta +from time import sleep from typing import TYPE_CHECKING, Any, NoReturn +from airflow.configuration import conf from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.temporal import DateTimeTrigger +from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone if TYPE_CHECKING: @@ -89,3 +92,37 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute_complete(self, context: Context, event: Any = None) -> None: """Handle the event when the trigger fires and return immediately.""" return None + + +class WaitSensor(BaseSensorOperator): +""" +A sensor that waits a specified period of time before completing. + +This differs from TimeDeltaSensor because the time to wait is measured from the start of the task, not +the data_interval_end of the DAG run. + +:param time_to_wait: time length to wait after the task starts before succeeding. +:param deferrable: Run sensor in deferrable mode +""" + +def __init__( +self, +time_to_wait: timedelta | int, +deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), +**kwargs, +) -> None: +super().__init__(**kwargs) +self.deferrable = deferrable +if isinstance(time_to_wait, int): +self.time_to_wait = timedelta(minutes=time_to_wait) +else: +self.time_to_wait = time_to_wait + +def execute(self, context: Context) -> None: +if self.deferrable: +self.defer( +trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True), +method_name="execute_complete", +) +else: +sleep(int(self.time_to_wait.total_seconds())) diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py index b437937df2..408a3c8828 100644 --- a/tests/sensors/test_time_delta.py +++ b/tests/sensors/test_time_delta.py @@ -22,15 +22,15 @@ from unittest import mock import pendulum import pytest +import time_machine from airflow.models import DagBag from airflow.models.dag import DAG -from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync +from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync, WaitSensor from airflow.utils.timezone import datetime pytestmark = pytest.mark.db_test - DEFAULT_DATE = datetime(2015, 1, 1) DEV_NULL = "/dev/null" TEST_DAG_ID = "unit_tests" @@ -71,3 +71,22 @@ class TestTimeDeltaSensorAsync: defer_mock.assert_called_once() else: defer_mock.assert_not_called() + +@pytest.mark.parametrize( +"should_defer", +[False, True], +) +@mock.patch("airflow.models.baseoperator.BaseOperator.defer") +@mock.patch("airflow.sensors.time_delta.sleep") +def test_wait_sensor(self, sleep_mock, defer_mock, should_defer): +wait_time = timedelta(seconds=30) +op = WaitSensor( +task_id="wait_sensor_check", time_to_wait=wait_time, dag=self.dag, deferrable=should_defer +) +with time_machine.travel(pendulum.datetime(year=2024, month=8, day=1, tz="UTC"), tick=False): +op.execute({}) +if should_defer: +defer_mock.assert_called_once() +else: +defer_mock.assert_not_called() +sleep_mock.assert_called_once_with(30)
(airflow) branch main updated: Fix: Keep compatibility with old FAB versions (#41549)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new d7d944e381 Fix: Keep compatibility with old FAB versions (#41549) d7d944e381 is described below commit d7d944e3818baf98c310314e369d767866012939 Author: Joao Amaral <7281460+joaopama...@users.noreply.github.com> AuthorDate: Tue Aug 20 16:42:05 2024 -0300 Fix: Keep compatibility with old FAB versions (#41549) --- airflow/models/dag.py| 30 + tests/models/test_dag.py | 57 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 518b367067..b685b28343 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -57,6 +57,7 @@ import pendulum import re2 import sqlalchemy_jsonfield from dateutil.relativedelta import relativedelta +from packaging import version as packaging_version from sqlalchemy import ( Boolean, Column, @@ -116,6 +117,7 @@ from airflow.models.taskinstance import ( clear_task_instances, ) from airflow.models.tasklog import LogTemplate +from airflow.providers.fab import __version__ as FAB_VERSION from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.security import permissions from airflow.settings import json @@ -936,16 +938,26 @@ class DAG(LoggingMixin): updated_access_control = {} for role, perms in access_control.items(): -updated_access_control[role] = updated_access_control.get(role, {}) -if isinstance(perms, (set, list)): -# Support for old-style access_control where only the actions are specified -updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) +if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"): +updated_access_control[role] = updated_access_control.get(role, {}) +if isinstance(perms, (set, list)): +# Support for old-style access_control where only the actions are specified +updated_access_control[role][permissions.RESOURCE_DAG] = set(perms) +else: +updated_access_control[role] = perms +if permissions.RESOURCE_DAG in updated_access_control[role]: +updated_access_control[role][permissions.RESOURCE_DAG] = { +update_old_perm(perm) +for perm in updated_access_control[role][permissions.RESOURCE_DAG] +} +elif isinstance(perms, dict): +# Not allow new access control format with old FAB versions +raise AirflowException( +"Please upgrade the FAB provider to a version >= 1.3.0 to allow " +"use the Dag Level Access Control new format." +) else: -updated_access_control[role] = perms -if permissions.RESOURCE_DAG in updated_access_control[role]: -updated_access_control[role][permissions.RESOURCE_DAG] = { -update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG] -} +updated_access_control[role] = {update_old_perm(perm) for perm in perms} return updated_access_control diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 0bd58ddc52..4994e4545d 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2539,6 +2539,63 @@ my_postgres_conn: assert "permission is deprecated" in str(deprecation_warnings[0].message) assert "permission is deprecated" in str(deprecation_warnings[1].message) +@pytest.mark.parametrize( +"fab_version, perms, expected_exception, expected_perms", +[ +pytest.param( +"1.2.0", +{ +"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, +"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, +# will raise error in old FAB with new access control format +}, +AirflowException, +None, +id="old_fab_new_access_control_format", +), +pytest.param( +"1.2.0", +{ +"role1": [ +permissions.ACTION_CAN_READ, +permissions.ACTION_CAN_EDIT, +permissions.ACTION_CAN_READ, +], +},
(airflow) branch main updated: Add fixes by breeze/precommit-lint static checks (#41604) (#41618)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c78a004210 Add fixes by breeze/precommit-lint static checks (#41604) (#41618) c78a004210 is described below commit c78a0042100ea7330c1fbc7ac234306e09d4678e Author: Omkar P <45419097+omkar-f...@users.noreply.github.com> AuthorDate: Wed Aug 21 01:10:03 2024 +0530 Add fixes by breeze/precommit-lint static checks (#41604) (#41618) --- .../providers/fab/auth_manager/cli_commands/user_command.py | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/airflow/providers/fab/auth_manager/cli_commands/user_command.py b/airflow/providers/fab/auth_manager/cli_commands/user_command.py index 3050a9e250..5853dcf1a6 100644 --- a/airflow/providers/fab/auth_manager/cli_commands/user_command.py +++ b/airflow/providers/fab/auth_manager/cli_commands/user_command.py @@ -212,10 +212,12 @@ def users_import(args): users_created, users_updated = _import_users(users_list) if users_created: -print("Created the following users:\n\t{}".format("\n\t".join(users_created))) +users_created_str = "\n\t".join(users_created) +print(f"Created the following users:\n\t{users_created_str}") if users_updated: -print("Updated the following users:\n\t{}".format("\n\t".join(users_updated))) +users_updated_str = "\n\t".join(users_updated) +print(f"Updated the following users:\n\t{users_updated_str}") def _import_users(users_list: list[dict[str, Any]]): @@ -231,9 +233,8 @@ def _import_users(users_list: list[dict[str, Any]]): msg.append(f"[Item {row_num}]") for key, value in failure.items(): msg.append(f"\t{key}: {value}") -raise SystemExit( -"Error: Input file didn't pass validation. See below:\n{}".format("\n".join(msg)) -) +msg_str = "\n".join(msg) +raise SystemExit(f"Error: Input file didn't pass validation. See below:\n{msg_str}") for user in users_list: roles = []
(airflow) branch main updated (1b602d5026 -> c87950173a)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 1b602d5026 Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612) add c87950173a custom dependency detector removal (#41609) No new revisions were added by this update. Summary of changes: airflow/serialization/serialized_objects.py | 27 +- newsfragments/41609.significant.rst | 1 + tests/serialization/test_dag_serialization.py | 52 --- 3 files changed, 2 insertions(+), 78 deletions(-) create mode 100644 newsfragments/41609.significant.rst
(airflow) branch main updated: Openlineage s3 to redshift operator integration (#41575)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new a2b8696bdb Openlineage s3 to redshift operator integration (#41575) a2b8696bdb is described below commit a2b8696bdb6e99b27e40e9d8bce04dd2ee1789a7 Author: Artur Skarżyński <33717106+artu...@users.noreply.github.com> AuthorDate: Mon Aug 19 16:11:54 2024 +0200 Openlineage s3 to redshift operator integration (#41575) - Co-authored-by: Vincent <97131062+vincb...@users.noreply.github.com> --- .../amazon/aws/transfers/s3_to_redshift.py | 101 ++- airflow/providers/amazon/aws/utils/openlineage.py | 136 + .../amazon/aws/transfers/test_s3_to_redshift.py| 308 + .../providers/amazon/aws/utils/test_openlineage.py | 168 +++ 4 files changed, 703 insertions(+), 10 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index 161276b33c..653885b541 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -121,6 +121,10 @@ class S3ToRedshiftOperator(BaseOperator): if arg in self.redshift_data_api_kwargs: raise AirflowException(f"Cannot include param '{arg}' in Redshift Data API kwargs") +@property +def use_redshift_data(self): +return bool(self.redshift_data_api_kwargs) + def _build_copy_query( self, copy_destination: str, credentials_block: str, region_info: str, copy_options: str ) -> str: @@ -138,11 +142,11 @@ class S3ToRedshiftOperator(BaseOperator): if self.method not in AVAILABLE_METHODS: raise AirflowException(f"Method not found! Available methods: {AVAILABLE_METHODS}") -redshift_hook: RedshiftDataHook | RedshiftSQLHook -if self.redshift_data_api_kwargs: -redshift_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id) +if self.use_redshift_data: +redshift_data_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id) else: -redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id) +redshift_sql_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id) + conn = S3Hook.get_connection(conn_id=self.aws_conn_id) if self.aws_conn_id else None region_info = "" if conn and conn.extra_dejson.get("region", False): @@ -167,12 +171,12 @@ class S3ToRedshiftOperator(BaseOperator): if self.method == "REPLACE": sql = ["BEGIN;", f"DELETE FROM {destination};", copy_statement, "COMMIT"] elif self.method == "UPSERT": -if isinstance(redshift_hook, RedshiftDataHook): -keys = self.upsert_keys or redshift_hook.get_table_primary_key( +if self.use_redshift_data: +keys = self.upsert_keys or redshift_data_hook.get_table_primary_key( table=self.table, schema=self.schema, **self.redshift_data_api_kwargs ) else: -keys = self.upsert_keys or redshift_hook.get_table_primary_key(self.table, self.schema) +keys = self.upsert_keys or redshift_sql_hook.get_table_primary_key(self.table, self.schema) if not keys: raise AirflowException( f"No primary key on {self.schema}.{self.table}. Please provide keys on 'upsert_keys'" @@ -192,8 +196,85 @@ class S3ToRedshiftOperator(BaseOperator): sql = copy_statement self.log.info("Executing COPY command...") -if isinstance(redshift_hook, RedshiftDataHook): -redshift_hook.execute_query(sql=sql, **self.redshift_data_api_kwargs) +if self.use_redshift_data: +redshift_data_hook.execute_query(sql=sql, **self.redshift_data_api_kwargs) else: -redshift_hook.run(sql, autocommit=self.autocommit) +redshift_sql_hook.run(sql, autocommit=self.autocommit) self.log.info("COPY command complete...") + +def get_openlineage_facets_on_complete(self, task_instance): +"""Implement on_complete as we will query destination table.""" +from pathlib import Path + +from airflow.providers.amazon.aws.utils.openlineage import ( +get_facets_from_redshift_table, +get_identity_column_lineage_facet, +) +from airflow.providers.common.compat.openlineage.facet import ( +Dataset, +Identifier, +Lifecyc
(airflow) branch main updated: Partial fix for example_dynamodb_to_s3.py (#41517)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new f2b7bb8803 Partial fix for example_dynamodb_to_s3.py (#41517) f2b7bb8803 is described below commit f2b7bb880345aeaaebdbb5ef48a6ef3bb87d821e Author: D. Ferruzzi AuthorDate: Fri Aug 16 04:17:33 2024 -0700 Partial fix for example_dynamodb_to_s3.py (#41517) `backup_db_to_point_in_time_incremental_export` still fails with "Incremental export period from time should not be greater or equal to incremental export period to time" --- airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py| 2 +- tests/system/providers/amazon/aws/example_dynamodb_to_s3.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index c40001bbff..f7670cfd83 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -148,7 +148,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): self.point_in_time_export = point_in_time_export self.export_time = export_time self.export_format = export_format -self.export_table_to_point_in_time_kwargs = export_table_to_point_in_time_kwargs +self.export_table_to_point_in_time_kwargs = export_table_to_point_in_time_kwargs or {} self.check_interval = check_interval self.max_attempts = max_attempts diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py index c39abd3b80..225aaa7803 100644 --- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py +++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py @@ -225,6 +225,7 @@ with DAG( backup_db_segment_1, backup_db_segment_2, export_time, +latest_export_time, backup_db_to_point_in_time_full_export, backup_db_to_point_in_time_incremental_export, # TEST TEARDOWN
(airflow) branch main updated: Add incremental export and cross account export functionality in `DynamoDBToS3Operator` (#41304)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new a70ee7209c Add incremental export and cross account export functionality in `DynamoDBToS3Operator` (#41304) a70ee7209c is described below commit a70ee7209cd8d4dabb8bc1b1057d79ff25a99bae Author: Steven Shidi Zhou AuthorDate: Thu Aug 15 18:56:32 2024 +0200 Add incremental export and cross account export functionality in `DynamoDBToS3Operator` (#41304) - Co-authored-by: Vincent <97131062+vincb...@users.noreply.github.com> --- .../amazon/aws/transfers/dynamodb_to_s3.py | 51 +- .../transfer/dynamodb_to_s3.rst| 16 +-- .../amazon/aws/transfers/test_dynamodb_to_s3.py| 2 + .../providers/amazon/aws/example_dynamodb_to_s3.py | 45 --- 4 files changed, 94 insertions(+), 20 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index 81f0dd79b6..c40001bbff 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator +from airflow.utils.helpers import prune_dict if TYPE_CHECKING: from airflow.utils.context import Context @@ -89,10 +90,13 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> :param s3_key_prefix: Prefix of s3 object key :param process_func: How we transform a dynamodb item to bytes. By default, we dump the json +:param point_in_time_export: Boolean value indicating the operator to use 'scan' or 'point in time export' :param export_time: Time in the past from which to export table data, counted in seconds from the start of the Unix epoch. The table export will be a snapshot of the table's state at this point in time. :param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON or ION. +:param export_table_to_point_in_time_kwargs: extra parameters for the boto3 +`export_table_to_point_in_time` function all. e.g. `ExportType`, `IncrementalExportSpecification` :param check_interval: The amount of time in seconds to wait between attempts. Only if ``export_time`` is provided. :param max_attempts: The maximum number of attempts to be made. Only if ``export_time`` is provided. @@ -107,12 +111,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): "s3_key_prefix", "export_time", "export_format", +"export_table_to_point_in_time_kwargs", "check_interval", "max_attempts", ) template_fields_renderers = { "dynamodb_scan_kwargs": "json", +"export_table_to_point_in_time_kwargs": "json", } def __init__( @@ -120,12 +126,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): *, dynamodb_table_name: str, s3_bucket_name: str, -file_size: int, +file_size: int = 1000, dynamodb_scan_kwargs: dict[str, Any] | None = None, s3_key_prefix: str = "", process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes, +point_in_time_export: bool = False, export_time: datetime | None = None, export_format: str = "DYNAMODB_JSON", +export_table_to_point_in_time_kwargs: dict | None = None, check_interval: int = 30, max_attempts: int = 60, **kwargs, @@ -137,8 +145,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): self.dynamodb_scan_kwargs = dynamodb_scan_kwargs self.s3_bucket_name = s3_bucket_name self.s3_key_prefix = s3_key_prefix +self.point_in_time_export = point_in_time_export self.export_time = export_time self.export_format = export_format +self.export_table_to_point_in_time_kwargs = export_table_to_point_in_time_kwargs self.check_interval = check_interval self.max_attempts = max_attempts @@ -148,29 +158,50 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): return DynamoDBHook(aws_conn_id=self.source_aws_conn_id) def execute(self, context: Context) -> None: -if self.export_ti
(airflow) branch main updated: Remove deprecated code is AWS provider (#41407)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c8fc0ade09 Remove deprecated code is AWS provider (#41407) c8fc0ade09 is described below commit c8fc0ade0901ff7f08c45545f56caeb1a7d96bd1 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Aug 15 11:20:56 2024 -0400 Remove deprecated code is AWS provider (#41407) --- airflow/providers/amazon/aws/sensors/athena.py | 8 +--- airflow/providers/amazon/aws/sensors/batch.py | 40 +++- airflow/providers/amazon/aws/sensors/bedrock.py| 5 +- .../amazon/aws/sensors/cloud_formation.py | 13 +- airflow/providers/amazon/aws/sensors/comprehend.py | 8 +--- airflow/providers/amazon/aws/sensors/dms.py| 16 ++- airflow/providers/amazon/aws/sensors/ec2.py| 8 +--- airflow/providers/amazon/aws/sensors/ecs.py| 10 ++-- airflow/providers/amazon/aws/sensors/eks.py| 12 ++--- airflow/providers/amazon/aws/sensors/emr.py| 45 -- airflow/providers/amazon/aws/sensors/glacier.py| 8 +--- airflow/providers/amazon/aws/sensors/glue.py | 9 .../amazon/aws/sensors/glue_catalog_partition.py | 8 +--- .../providers/amazon/aws/sensors/glue_crawler.py | 8 +--- .../amazon/aws/sensors/kinesis_analytics.py| 5 +- .../amazon/aws/sensors/lambda_function.py | 10 ++-- .../amazon/aws/sensors/opensearch_serverless.py| 5 +- airflow/providers/amazon/aws/sensors/quicksight.py | 7 +-- .../amazon/aws/sensors/redshift_cluster.py | 8 +--- airflow/providers/amazon/aws/sensors/s3.py | 17 ++- airflow/providers/amazon/aws/sensors/sagemaker.py | 10 ++-- airflow/providers/amazon/aws/sensors/sqs.py| 14 ++ .../providers/amazon/aws/sensors/step_function.py | 8 +--- tests/providers/amazon/aws/sensors/test_athena.py | 15 ++ tests/providers/amazon/aws/sensors/test_batch.py | 46 -- tests/providers/amazon/aws/sensors/test_bedrock.py | 54 ++ .../amazon/aws/sensors/test_cloud_formation.py | 27 +++ .../amazon/aws/sensors/test_comprehend.py | 28 +++ tests/providers/amazon/aws/sensors/test_dms.py | 24 ++ tests/providers/amazon/aws/sensors/test_ecs.py | 20 +--- .../aws/sensors/test_emr_serverless_application.py | 15 +- .../amazon/aws/sensors/test_emr_serverless_job.py | 13 +- tests/providers/amazon/aws/sensors/test_glacier.py | 14 ++ tests/providers/amazon/aws/sensors/test_glue.py| 10 ++-- .../aws/sensors/test_glue_catalog_partition.py | 10 ++-- .../amazon/aws/sensors/test_glue_crawler.py| 10 ++-- .../amazon/aws/sensors/test_glue_data_quality.py | 28 +++ .../amazon/aws/sensors/test_kinesis_analytics.py | 28 +++ .../amazon/aws/sensors/test_lambda_function.py | 10 ++-- .../aws/sensors/test_opensearch_serverless.py | 17 ++- .../amazon/aws/sensors/test_quicksight.py | 17 ++- tests/providers/amazon/aws/sensors/test_s3.py | 26 +++ .../amazon/aws/sensors/test_sagemaker_base.py | 10 ++-- tests/providers/amazon/aws/sensors/test_sqs.py | 28 +++ .../amazon/aws/sensors/test_step_function.py | 17 ++- 45 files changed, 170 insertions(+), 579 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/athena.py b/airflow/providers/amazon/aws/sensors/athena.py index 38f2bb54f8..1dc8a4fe33 100644 --- a/airflow/providers/amazon/aws/sensors/athena.py +++ b/airflow/providers/amazon/aws/sensors/athena.py @@ -25,7 +25,7 @@ from airflow.providers.amazon.aws.utils.mixins import aws_template_fields if TYPE_CHECKING: from airflow.utils.context import Context -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.athena import AthenaHook @@ -88,11 +88,7 @@ class AthenaSensor(AwsBaseSensor[AthenaHook]): state = self.hook.poll_query_status(self.query_execution_id, self.max_retries, self.sleep_time) if state in self.FAILURE_STATES: -# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 -message = "Athena sensor failed" -if self.soft_fail: -raise AirflowSkipException(message) -raise AirflowException(message) +raise AirflowException("Athena sensor failed") if state in self.INTERMEDIATE_STATES: return False diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index c5dcb0e46d..9c1a29f809 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/pr
(airflow) branch main updated: Mark TestSparkKubernetes test as db test (#41500)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 666183f8d9 Mark TestSparkKubernetes test as db test (#41500) 666183f8d9 is described below commit 666183f8d946e18948a8e7651eb0a86137cbaed4 Author: Jarek Potiuk AuthorDate: Thu Aug 15 16:02:39 2024 +0200 Mark TestSparkKubernetes test as db test (#41500) This test missed the mark and it has not been detected because of a bugggy pytest behaviour https://github.com/pytest-dev/pytest/issues/12605 that we had to workaround in #41499 --- tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py index 343190d093..bc8404aa85 100644 --- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -187,6 +187,7 @@ def create_context(task): } +@pytest.mark.db_test @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_requested_container_logs") @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_start")
(airflow) branch main updated: Fix AWS Redshift operators and sensors (#41191)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 7fe573317e Fix AWS Redshift operators and sensors (#41191) 7fe573317e is described below commit 7fe573317eb630c2d176329c599d6fbbb30f4378 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Aug 1 13:54:14 2024 -0400 Fix AWS Redshift operators and sensors (#41191) --- .../amazon/aws/operators/redshift_cluster.py | 118 - .../amazon/aws/operators/test_redshift_cluster.py | 25 - .../providers/amazon/aws/example_redshift.py | 4 +- 3 files changed, 91 insertions(+), 56 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index f31b7db09a..4666445f96 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.triggers.redshift_cluster import ( RedshiftResumeClusterTrigger, ) from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.utils.helpers import prune_dict if TYPE_CHECKING: from airflow.utils.context import Context @@ -507,8 +508,8 @@ class RedshiftResumeClusterOperator(BaseOperator): aws_conn_id: str | None = "aws_default", wait_for_completion: bool = False, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), -poll_interval: int = 10, -max_attempts: int = 10, +poll_interval: int = 30, +max_attempts: int = 30, **kwargs, ): super().__init__(**kwargs) @@ -542,38 +543,38 @@ class RedshiftResumeClusterOperator(BaseOperator): else: raise error -if self.deferrable: -cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) -if cluster_state == "available": -self.log.info("Resumed cluster successfully") -elif cluster_state == "deleting": -raise AirflowException( -"Unable to resume cluster since cluster is currently in status: %s", cluster_state -) +if self.wait_for_completion: +if self.deferrable: +cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) +if cluster_state == "available": +self.log.info("Resumed cluster successfully") +elif cluster_state == "deleting": +raise AirflowException( +"Unable to resume cluster since cluster is currently in status: %s", cluster_state +) +else: +self.defer( +trigger=RedshiftResumeClusterTrigger( +cluster_identifier=self.cluster_identifier, +waiter_delay=self.poll_interval, +waiter_max_attempts=self.max_attempts, +aws_conn_id=self.aws_conn_id, +), +method_name="execute_complete", +# timeout is set to ensure that if a trigger dies, the timeout does not restart +# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent) +timeout=timedelta(seconds=self.max_attempts * self.poll_interval + 60), +) else: -self.defer( -trigger=RedshiftResumeClusterTrigger( -cluster_identifier=self.cluster_identifier, -waiter_delay=self.poll_interval, -waiter_max_attempts=self.max_attempts, -aws_conn_id=self.aws_conn_id, -), -method_name="execute_complete", -# timeout is set to ensure that if a trigger dies, the timeout does not restart -# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent) -timeout=timedelta(seconds=self.max_attempts * self.poll_interval + 60), +waiter = redshift_hook.get_waiter("cluster_resumed") +waiter.wait( +ClusterIdentifier=self.cluster_identifier, +WaiterConfig={ +"Delay": self.poll_interval, +"MaxAttempts":
(airflow) branch main updated (f5d8b0e796 -> ab0cf2eb25)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from f5d8b0e796 Skip tests in DB isolation mode in `tests/providers/amazon/aws/sensors/test_base_aws.py` (#41157) add ab0cf2eb25 Deprecate `SageMakerTrainingPrintLogTrigger` (#41158) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/operators/sagemaker.py | 19 --- airflow/providers/amazon/aws/triggers/sagemaker.py | 10 +- 2 files changed, 13 insertions(+), 16 deletions(-)
(airflow) branch main updated (6a5ae50891 -> 26f317ebce)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 6a5ae50891 Update providers metadata 2024-07-31 (#41159) add 26f317ebce Skip tests in DB isolation mode in `tests/providers/amazon/aws/operators/test_base_aws.py` (#41156) No new revisions were added by this update. Summary of changes: tests/providers/amazon/aws/operators/test_base_aws.py | 2 ++ 1 file changed, 2 insertions(+)
(airflow) branch main updated: Fix `EmrServerlessStartJobOperator` (#41103)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 97c4fdce71 Fix `EmrServerlessStartJobOperator` (#41103) 97c4fdce71 is described below commit 97c4fdce71e0665997b7c3a8f78324af616c91b4 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Jul 29 16:14:11 2024 -0400 Fix `EmrServerlessStartJobOperator` (#41103) --- airflow/providers/amazon/aws/operators/emr.py | 46 +++--- .../amazon/aws/operators/test_emr_serverless.py| 21 ++ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index c13c622937..fb2f5de478 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -1382,30 +1382,30 @@ class EmrServerlessStartJobOperator(BaseOperator): self.persist_links(context) -if self.deferrable: -self.defer( -trigger=EmrServerlessStartJobTrigger( -application_id=self.application_id, -job_id=self.job_id, -waiter_delay=self.waiter_delay, -waiter_max_attempts=self.waiter_max_attempts, -aws_conn_id=self.aws_conn_id, -), -method_name="execute_complete", -timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), -) - if self.wait_for_completion: -waiter = self.hook.get_waiter("serverless_job_completed") -wait( -waiter=waiter, -waiter_max_attempts=self.waiter_max_attempts, -waiter_delay=self.waiter_delay, -args={"applicationId": self.application_id, "jobRunId": self.job_id}, -failure_message="Serverless Job failed", -status_message="Serverless Job status is", -status_args=["jobRun.state", "jobRun.stateDetails"], -) +if self.deferrable: +self.defer( +trigger=EmrServerlessStartJobTrigger( +application_id=self.application_id, +job_id=self.job_id, +waiter_delay=self.waiter_delay, +waiter_max_attempts=self.waiter_max_attempts, +aws_conn_id=self.aws_conn_id, +), +method_name="execute_complete", +timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), +) +else: +waiter = self.hook.get_waiter("serverless_job_completed") +wait( +waiter=waiter, +waiter_max_attempts=self.waiter_max_attempts, +waiter_delay=self.waiter_delay, +args={"applicationId": self.application_id, "jobRunId": self.job_id}, +failure_message="Serverless Job failed", +status_message="Serverless Job status is", +status_args=["jobRun.state", "jobRun.stateDetails"], +) return self.job_id diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py b/tests/providers/amazon/aws/operators/test_emr_serverless.py index 4804f22869..12c5cc9380 100644 --- a/tests/providers/amazon/aws/operators/test_emr_serverless.py +++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py @@ -836,6 +836,27 @@ class TestEmrServerlessStartJobOperator: with pytest.raises(TaskDeferred): operator.execute(self.mock_context) +@mock.patch.object(EmrServerlessHook, "conn") +def test_start_job_deferrable_without_wait_for_completion(self, mock_conn): +mock_conn.get_application.return_value = {"application": {"state": "STARTED"}} +mock_conn.start_job_run.return_value = { +"jobRunId": job_run_id, +"ResponseMetadata": {"HTTPStatusCode": 200}, +} +operator = EmrServerlessStartJobOperator( +task_id=task_id, +application_id=application_id, +execution_role_arn=execution_role_arn, +job_driver=job_driver, +configuration_overrides=configuration_overrides, +deferrable=True, +wait_for_completion=False, +) + +result = operator.execute(self.mock_context) + +assert result == job_run_id + @mock.patch.object(EmrServerlessHook, "get_waiter") @mock.patch.object(EmrServerlessHook, "conn") def test_start_job_deferrable_app_not_started(self, mock_conn, mock_get_waiter):
(airflow) branch main updated: Make EMR Container Trigger max attempts retries match the Operator (#41008)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c519920661 Make EMR Container Trigger max attempts retries match the Operator (#41008) c519920661 is described below commit c519920661133a06e917a781e73caeac111b26f5 Author: Niko Oliveira AuthorDate: Fri Jul 26 13:52:37 2024 -0700 Make EMR Container Trigger max attempts retries match the Operator (#41008) The EMR Container Operator will wait indefinitely by default (on the wait for completion path) however when it is deferred the Trigger has a default timeout of 600s which does not match the user's expectations when using the operator. Update the Trigger to have an infinite try count by default to match the Operator behaviour. --- airflow/providers/amazon/aws/triggers/emr.py| 4 +++- tests/providers/amazon/aws/triggers/test_emr.py | 24 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/triggers/emr.py b/airflow/providers/amazon/aws/triggers/emr.py index 8b64d84f63..9abfe120d2 100644 --- a/airflow/providers/amazon/aws/triggers/emr.py +++ b/airflow/providers/amazon/aws/triggers/emr.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import sys import warnings from typing import TYPE_CHECKING @@ -174,6 +175,7 @@ class EmrContainerTrigger(AwsBaseWaiterTrigger): :param job_id: job_id to check the state :param aws_conn_id: Reference to AWS connection id :param waiter_delay: polling period in seconds to check for the status +:param waiter_max_attempts: The maximum number of attempts to be made. Defaults to an infinite wait. """ def __init__( @@ -183,7 +185,7 @@ class EmrContainerTrigger(AwsBaseWaiterTrigger): aws_conn_id: str | None = "aws_default", poll_interval: int | None = None, # deprecated waiter_delay: int = 30, -waiter_max_attempts: int = 600, +waiter_max_attempts: int = sys.maxsize, ): if poll_interval is not None: warnings.warn( diff --git a/tests/providers/amazon/aws/triggers/test_emr.py b/tests/providers/amazon/aws/triggers/test_emr.py index 92fd08857d..3469ee4c13 100644 --- a/tests/providers/amazon/aws/triggers/test_emr.py +++ b/tests/providers/amazon/aws/triggers/test_emr.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +import sys + from airflow.providers.amazon.aws.triggers.emr import ( EmrAddStepsTrigger, EmrContainerTrigger, @@ -209,6 +211,28 @@ class TestEmrContainerTrigger: "aws_conn_id": "aws_default", } +def test_serialization_default_max_attempts(self): +virtual_cluster_id = "test_virtual_cluster_id" +job_id = "test_job_id" +waiter_delay = 30 +aws_conn_id = "aws_default" + +trigger = EmrContainerTrigger( +virtual_cluster_id=virtual_cluster_id, +job_id=job_id, +waiter_delay=waiter_delay, +aws_conn_id=aws_conn_id, +) +classpath, kwargs = trigger.serialize() +assert classpath == "airflow.providers.amazon.aws.triggers.emr.EmrContainerTrigger" +assert kwargs == { +"virtual_cluster_id": "test_virtual_cluster_id", +"job_id": "test_job_id", +"waiter_delay": 30, +"waiter_max_attempts": sys.maxsize, +"aws_conn_id": "aws_default", +} + class TestEmrStepSensorTrigger: def test_serialization(self):
(airflow) branch main updated: Fix `RdsStopDbOperator` operator in deferrable mode (#41059)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 83ca61a501 Fix `RdsStopDbOperator` operator in deferrable mode (#41059) 83ca61a501 is described below commit 83ca61a501d755669fc83b1ad9038d0ca9d600ad Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Fri Jul 26 16:52:25 2024 -0400 Fix `RdsStopDbOperator` operator in deferrable mode (#41059) --- airflow/providers/amazon/aws/hooks/rds.py| 6 +- airflow/providers/amazon/aws/operators/rds.py| 37 ++-- airflow/providers/amazon/aws/waiters/rds.json| 253 +++ tests/providers/amazon/aws/hooks/test_rds.py | 24 +-- tests/providers/amazon/aws/operators/test_rds.py | 14 +- 5 files changed, 280 insertions(+), 54 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/rds.py b/airflow/providers/amazon/aws/hooks/rds.py index 8219a37757..588d78c782 100644 --- a/airflow/providers/amazon/aws/hooks/rds.py +++ b/airflow/providers/amazon/aws/hooks/rds.py @@ -259,7 +259,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): return self.get_db_instance_state(db_instance_id) target_state = target_state.lower() -if target_state in ("available", "deleted"): +if target_state in ("available", "deleted", "stopped"): waiter = self.conn.get_waiter(f"db_instance_{target_state}") # type: ignore wait( waiter=waiter, @@ -272,7 +272,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): ) else: self._wait_for_state(poke, target_state, check_interval, max_attempts) -self.log.info("DB cluster snapshot '%s' reached the '%s' state", db_instance_id, target_state) +self.log.info("DB cluster '%s' reached the '%s' state", db_instance_id, target_state) def get_db_cluster_state(self, db_cluster_id: str) -> str: """ @@ -310,7 +310,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): return self.get_db_cluster_state(db_cluster_id) target_state = target_state.lower() -if target_state in ("available", "deleted"): +if target_state in ("available", "deleted", "stopped"): waiter = self.conn.get_waiter(f"db_cluster_{target_state}") # type: ignore waiter.wait( DBClusterIdentifier=db_cluster_id, diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index a2f35b5081..f37c698d87 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -36,6 +36,7 @@ from airflow.providers.amazon.aws.utils import validate_execute_complete_event from airflow.providers.amazon.aws.utils.rds import RdsDbType from airflow.providers.amazon.aws.utils.tags import format_tags from airflow.providers.amazon.aws.utils.waiter_with_logging import wait +from airflow.utils.helpers import prune_dict if TYPE_CHECKING: from mypy_boto3_rds.type_defs import TagTypeDef @@ -782,7 +783,7 @@ class RdsStartDbOperator(RdsBaseOperator): aws_conn_id=self.aws_conn_id, region_name=self.region_name, response=start_db_response, -db_type=RdsDbType.INSTANCE, +db_type=self.db_type, ), method_name="execute_complete", ) @@ -881,12 +882,25 @@ class RdsStopDbOperator(RdsBaseOperator): aws_conn_id=self.aws_conn_id, region_name=self.region_name, response=stop_db_response, -db_type=RdsDbType.INSTANCE, +db_type=self.db_type, ), method_name="execute_complete", ) elif self.wait_for_completion: -self._wait_until_db_stopped() +waiter = self.hook.get_waiter(f"db_{self.db_type.value}_stopped") +waiter_key = ( +"DBInstanceIdentifier" if self.db_type == RdsDbType.INSTANCE else "DBClusterIdentifier" +) +kwargs = {waiter_key: self.db_identifier} +waiter.wait( +WaiterConfig=prune_dict( +{ +"Delay": self.waiter_delay, +"MaxAttempts": self.waiter_max_attempts, +} +), +**kwargs, +) return json.dumps(stop_db_response, d
(airflow) branch main updated: Bug fix: sync perm command not able to use custom security manager (#41020)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 95cab23792 Bug fix: sync perm command not able to use custom security manager (#41020) 95cab23792 is described below commit 95cab23792c80f0ecf980ac0a74b8d08431fb3bb Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Thu Jul 25 19:49:51 2024 +0530 Bug fix: sync perm command not able to use custom security manager (#41020) --- airflow/providers/fab/auth_manager/cli_commands/utils.py | 4 1 file changed, 4 insertions(+) diff --git a/airflow/providers/fab/auth_manager/cli_commands/utils.py b/airflow/providers/fab/auth_manager/cli_commands/utils.py index 8361aab51b..78403e2407 100644 --- a/airflow/providers/fab/auth_manager/cli_commands/utils.py +++ b/airflow/providers/fab/auth_manager/cli_commands/utils.py @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Generator from flask import Flask import airflow +from airflow.configuration import conf from airflow.www.extensions.init_appbuilder import init_appbuilder from airflow.www.extensions.init_views import init_plugins @@ -44,5 +45,8 @@ def _return_appbuilder(app: Flask) -> AirflowAppBuilder: def get_application_builder() -> Generator[AirflowAppBuilder, None, None]: static_folder = os.path.join(os.path.dirname(airflow.__file__), "www", "static") flask_app = Flask(__name__, static_folder=static_folder) +webserver_config = conf.get_mandatory_value("webserver", "config_file") with flask_app.app_context(): +# Enable customizations in webserver_config.py to be applied via Flask.current_app. +flask_app.config.from_pyfile(webserver_config, silent=True) yield _return_appbuilder(flask_app)
(airflow) branch main updated (b4e82cf66f -> 68b3159210)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from b4e82cf66f Update `example_redshift` and `example_redshift_s3_transfers` to use `RedshiftDataHook` instead of `RedshiftSQLHook` (#40970) add 68b3159210 Add RedriveExecution support to `StepFunctionStartExecutionOperator` (#40976) No new revisions were added by this update. Summary of changes: .../providers/amazon/aws/hooks/step_function.py| 18 +++ .../amazon/aws/operators/step_function.py | 14 +++-- .../amazon/aws/hooks/test_step_function.py | 30 ++ .../amazon/aws/operators/test_step_function.py | 36 -- 4 files changed, 94 insertions(+), 4 deletions(-)
(airflow) branch main updated: Update `example_redshift` and `example_redshift_s3_transfers` to use `RedshiftDataHook` instead of `RedshiftSQLHook` (#40970)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new b4e82cf66f Update `example_redshift` and `example_redshift_s3_transfers` to use `RedshiftDataHook` instead of `RedshiftSQLHook` (#40970) b4e82cf66f is described below commit b4e82cf66fb4d833a25de5e2688b44e7b4ddf4bb Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Wed Jul 24 14:08:06 2024 -0400 Update `example_redshift` and `example_redshift_s3_transfers` to use `RedshiftDataHook` instead of `RedshiftSQLHook` (#40970) --- .../amazon/aws/transfers/redshift_to_s3.py | 1 + .../amazon/aws/transfers/s3_to_redshift.py | 1 + .../amazon/aws/transfers/test_redshift_to_s3.py| 11 - .../amazon/aws/transfers/test_s3_to_redshift.py| 13 -- .../providers/amazon/aws/example_redshift.py | 35 .../amazon/aws/example_redshift_s3_transfers.py| 47 +- 6 files changed, 20 insertions(+), 88 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index f6aafeba59..73578ea539 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -84,6 +84,7 @@ class RedshiftToS3Operator(BaseOperator): "unload_options", "select_query", "redshift_conn_id", +"redshift_data_api_kwargs", ) template_ext: Sequence[str] = (".sql",) template_fields_renderers = {"select_query": "sql"} diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index 6418c111e2..161276b33c 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -77,6 +77,7 @@ class S3ToRedshiftOperator(BaseOperator): "copy_options", "redshift_conn_id", "method", +"redshift_data_api_kwargs", "aws_conn_id", ) template_ext: Sequence[str] = () diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py index d025b4836f..d2af90a445 100644 --- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py @@ -364,17 +364,6 @@ class TestRedshiftToS3Transfer: assert extra["role_arn"] in unload_query assert_equal_ignore_multiple_spaces(mock_run.call_args.args[0], unload_query) -def test_template_fields_overrides(self): -assert RedshiftToS3Operator.template_fields == ( -"s3_bucket", -"s3_key", -"schema", -"table", -"unload_options", -"select_query", -"redshift_conn_id", -) - @pytest.mark.parametrize("param", ["sql", "parameters"]) def test_invalid_param_in_redshift_data_api_kwargs(self, param): """ diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py index 6e3cbb2a1c..cb5ef7fdb7 100644 --- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py +++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py @@ -381,19 +381,6 @@ class TestS3ToRedshiftTransfer: assert mock_run.call_count == 1 assert_equal_ignore_multiple_spaces(actual_copy_query, expected_copy_query) -def test_template_fields_overrides(self): -assert S3ToRedshiftOperator.template_fields == ( -"s3_bucket", -"s3_key", -"schema", -"table", -"column_list", -"copy_options", -"redshift_conn_id", -"method", -"aws_conn_id", -) - def test_execute_unavailable_method(self): """ Test execute unavailable method diff --git a/tests/system/providers/amazon/aws/example_redshift.py b/tests/system/providers/amazon/aws/example_redshift.py index 84be4c702c..cc88811bef 100644 --- a/tests/system/providers/amazon/aws/example_redshift.py +++ b/tests/system/providers/amazon/aws/example_redshift.py @@ -20,12 +20,8 @@ from __future__ import annotations from datetime import datetime -from airflow import settings -from airflow.decorators import task -from airflow.mod
(airflow) branch main updated (2a377282d7 -> 5702481af4)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 2a377282d7 Fix indentation of scheduler_job_runner for standalone dag processor (#40929) add 5702481af4 Move AWS Managed Service for Apache Flink sensor states to Hook (#40896) No new revisions were added by this update. Summary of changes: .../amazon/aws/hooks/kinesis_analytics.py | 28 .../amazon/aws/sensors/kinesis_analytics.py| 162 ++--- tests/always/test_project_structure.py | 1 + 3 files changed, 106 insertions(+), 85 deletions(-)
(airflow) branch main updated: Update Glue Data Quality system test sample data (#40870)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new e774d08773 Update Glue Data Quality system test sample data (#40870) e774d08773 is described below commit e774d0877308a715425baaccf35af43abfbff6d9 Author: Syed Hussain <103602455+syeda...@users.noreply.github.com> AuthorDate: Fri Jul 19 07:36:40 2024 -0700 Update Glue Data Quality system test sample data (#40870) --- tests/system/providers/amazon/aws/example_glue_data_quality.py | 3 +-- .../amazon/aws/example_glue_data_quality_with_recommendation.py| 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_glue_data_quality.py b/tests/system/providers/amazon/aws/example_glue_data_quality.py index 15da75520d..e9b8f418e2 100644 --- a/tests/system/providers/amazon/aws/example_glue_data_quality.py +++ b/tests/system/providers/amazon/aws/example_glue_data_quality.py @@ -42,8 +42,7 @@ sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).bu DAG_ID = "example_glue_data_quality" SAMPLE_DATA = """"Alice",20 "Bob",25 -"Charlie",30 -""" +"Charlie",30""" SAMPLE_FILENAME = "airflow_sample.csv" RULE_SET = """ diff --git a/tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py b/tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py index c63d7a56f7..231750e971 100644 --- a/tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py +++ b/tests/system/providers/amazon/aws/example_glue_data_quality_with_recommendation.py @@ -45,8 +45,7 @@ sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).bu DAG_ID = "example_glue_data_quality_with_recommendation" SAMPLE_DATA = """"Alice",20 "Bob",25 -"Charlie",30 -""" +"Charlie",30""" SAMPLE_FILENAME = "airflow_sample.csv"
(airflow) branch main updated: Fix `RedshiftCreateClusterOperator` to always specify `PubliclyAccessible` (#40872)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 06b19eb8b0 Fix `RedshiftCreateClusterOperator` to always specify `PubliclyAccessible` (#40872) 06b19eb8b0 is described below commit 06b19eb8b099ba192d8bdb9877e784221ca6297d Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Jul 18 15:56:39 2024 -0400 Fix `RedshiftCreateClusterOperator` to always specify `PubliclyAccessible` (#40872) --- airflow/providers/amazon/aws/operators/redshift_cluster.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 107a2cc5d6..f31b7db09a 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -256,8 +256,6 @@ class RedshiftCreateClusterOperator(BaseOperator): params["ClusterVersion"] = self.cluster_version if self.allow_version_upgrade: params["AllowVersionUpgrade"] = self.allow_version_upgrade -if self.publicly_accessible: -params["PubliclyAccessible"] = self.publicly_accessible if self.encrypted: params["Encrypted"] = self.encrypted if self.hsm_client_certificate_identifier: @@ -287,6 +285,10 @@ class RedshiftCreateClusterOperator(BaseOperator): if self.default_iam_role_arn: params["DefaultIamRoleArn"] = self.default_iam_role_arn +# PubliclyAccessible is True by default on Redshift side, hence, we should always set it regardless +# of its value +params["PubliclyAccessible"] = self.publicly_accessible + cluster = redshift_hook.create_cluster( self.cluster_identifier, self.node_type,
(airflow) branch main updated: Introduce Amazon Kinesis Analytics V2 (Managed Service for Apache Flink application) (#40765)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new a4e3fbed1a Introduce Amazon Kinesis Analytics V2 (Managed Service for Apache Flink application) (#40765) a4e3fbed1a is described below commit a4e3fbed1a07b5685820a468e2d2ebb986b7d6b4 Author: GPK AuthorDate: Thu Jul 18 19:18:48 2024 +0100 Introduce Amazon Kinesis Analytics V2 (Managed Service for Apache Flink application) (#40765) --- .../amazon/aws/hooks/kinesis_analytics.py | 37 ++ .../amazon/aws/operators/kinesis_analytics.py | 348 +++ .../amazon/aws/sensors/kinesis_analytics.py| 242 ++ .../amazon/aws/triggers/kinesis_analytics.py | 69 +++ .../amazon/aws/waiters/kinesisanalyticsv2.json | 151 +++ airflow/providers/amazon/provider.yaml | 18 + .../operators/kinesis_analytics.rst| 115 + .../aws/amazon-kinesis-analytics_light...@4x.png | Bin 0 -> 16946 bytes docs/spelling_wordlist.txt | 1 + .../amazon/aws/hooks/test_kinesis_analytics.py | 31 ++ .../amazon/aws/operators/test_kinesis_analytics.py | 485 + .../amazon/aws/sensors/test_kinesis_analytics.py | 172 .../amazon/aws/triggers/test_kinesis_analytics.py | 78 .../amazon/aws/waiters/test_kinesis_analytics.py | 106 + .../amazon/aws/example_kinesis_analytics.py| 272 15 files changed, 2125 insertions(+) diff --git a/airflow/providers/amazon/aws/hooks/kinesis_analytics.py b/airflow/providers/amazon/aws/hooks/kinesis_analytics.py new file mode 100644 index 00..6f346dec45 --- /dev/null +++ b/airflow/providers/amazon/aws/hooks/kinesis_analytics.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook + + +class KinesisAnalyticsV2Hook(AwsBaseHook): +""" +Interact with Amazon Kinesis Analytics V2. + +Provide thin wrapper around :external+boto3:py:class:`boto3.client("kinesisanalyticsv2") `. + +Additional arguments (such as ``aws_conn_id``) may be specified and +are passed down to the underlying AwsBaseHook. + +.. seealso:: +- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` +""" + +def __init__(self, *args, **kwargs) -> None: +kwargs["client_type"] = "kinesisanalyticsv2" +super().__init__(*args, **kwargs) diff --git a/airflow/providers/amazon/aws/operators/kinesis_analytics.py b/airflow/providers/amazon/aws/operators/kinesis_analytics.py new file mode 100644 index 00..727aa714c6 --- /dev/null +++ b/airflow/providers/amazon/aws/operators/kinesis_analytics.py @@ -0,0 +1,348 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Sequence + +from botocore.exceptions import ClientError + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.kinesis_analytics import KinesisAnalyticsV2Hook +from airflow.providers.amazon.aws.oper
(airflow) branch main updated (ad5921837e -> d08d10a595)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from ad5921837e Set parallelism log messages to warning level for better visiblity (#39298) add d08d10a595 Deregister the ECS task definition in the system test `example_batch` (#40823) No new revisions were added by this update. Summary of changes: tests/system/providers/amazon/aws/example_batch.py | 8 1 file changed, 8 insertions(+)
(airflow) branch strip-overzealous-test deleted (was 40af8acc60)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch strip-overzealous-test in repository https://gitbox.apache.org/repos/asf/airflow.git was 40af8acc60 openlineage: do not check whitespace in SQL parser tests The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(airflow) branch main updated (dc6cc585bc -> 616c8816bf)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from dc6cc585bc Simplify _auth_manager_is_authorized_map function (#40803) add 616c8816bf openlineage: tests: do not check whitespace in returned SQL statement after splitting it (#40826) No new revisions were added by this update. Summary of changes: tests/providers/openlineage/test_sqlparser.py | 95 --- 1 file changed, 43 insertions(+), 52 deletions(-)
(airflow) branch main updated: Send important executor logs to task logs in `AwsBatchExecutor` (#40698)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 6b9214508a Send important executor logs to task logs in `AwsBatchExecutor` (#40698) 6b9214508a is described below commit 6b9214508ae8ff4d6d39e9ecda5138b5ba717ceb Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Thu Jul 11 11:49:36 2024 -0400 Send important executor logs to task logs in `AwsBatchExecutor` (#40698) --- .../amazon/aws/executors/batch/batch_executor.py | 35 .../aws/executors/batch/test_batch_executor.py | 47 +++--- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/batch/batch_executor.py b/airflow/providers/amazon/aws/executors/batch/batch_executor.py index cb9b575c77..4dca84ac19 100644 --- a/airflow/providers/amazon/aws/executors/batch/batch_executor.py +++ b/airflow/providers/amazon/aws/executors/batch/batch_executor.py @@ -20,8 +20,9 @@ from __future__ import annotations import contextlib +import logging import time -from collections import defaultdict, deque +from collections import deque from copy import deepcopy from typing import TYPE_CHECKING, Any, Dict, List, Sequence @@ -264,7 +265,6 @@ class AwsBatchExecutor(BaseExecutor): in the next iteration of the sync() method, unless it has exceeded the maximum number of attempts. If a job exceeds the maximum number of attempts, it is removed from the queue. """ -failure_reasons = defaultdict(int) for _ in range(len(self.pending_jobs)): batch_job = self.pending_jobs.popleft() key = batch_job.key @@ -272,7 +272,7 @@ class AwsBatchExecutor(BaseExecutor): queue = batch_job.queue exec_config = batch_job.executor_config attempt_number = batch_job.attempt_number -_failure_reason = [] +failure_reason: str | None = None if timezone.utcnow() < batch_job.next_attempt_time: self.pending_jobs.append(batch_job) continue @@ -286,18 +286,18 @@ class AwsBatchExecutor(BaseExecutor): if error_code in INVALID_CREDENTIALS_EXCEPTIONS: self.pending_jobs.append(batch_job) raise -_failure_reason.append(str(e)) +failure_reason = str(e) except Exception as e: -_failure_reason.append(str(e)) - -if _failure_reason: -for reason in _failure_reason: -failure_reasons[reason] += 1 +failure_reason = str(e) +if failure_reason: if attempt_number >= int(self.__class__.MAX_SUBMIT_JOB_ATTEMPTS): -self.log.error( -"This job has been unsuccessfully attempted too many times (%s). Dropping the task.", +self.send_message_to_task_logs( +logging.ERROR, +"This job has been unsuccessfully attempted too many times (%s). Dropping the task. Reason: %s", attempt_number, +failure_reason, +ti=key, ) self.fail(key=key) else: @@ -322,11 +322,6 @@ class AwsBatchExecutor(BaseExecutor): # running_state is added in Airflow 2.10 and only needed to support task adoption # (an optional executor feature). self.running_state(key, job_id) -if failure_reasons: -self.log.error( -"Pending Batch jobs failed to launch for the following reasons: %s. Retrying later.", -dict(failure_reasons), -) def _describe_jobs(self, job_ids) -> list[BatchJob]: all_jobs = [] @@ -462,3 +457,11 @@ class AwsBatchExecutor(BaseExecutor): not_adopted_tis = [ti for ti in tis if ti not in adopted_tis] return not_adopted_tis + +def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey): +# TODO: remove this method when min_airflow_version is set to higher than 2.10.0 +try: +super().send_message_to_task_logs(level, msg, *args, ti=ti) +except AttributeError: +# ``send_message_to_task_logs`` is added in 2.10.0 +self.log.error(msg, *args) diff --git a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py index cd6c2c87ac..3b81865810 100644 --- a/tests/providers/amazon/aws/executors/batch/test_batch_
(airflow) branch main updated: Use latest EMR release label in `example_emr` (#40702)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 17b792d898 Use latest EMR release label in `example_emr` (#40702) 17b792d898 is described below commit 17b792d898352b6a7cf67d2fc9804676e8c2105a Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Wed Jul 10 17:04:43 2024 -0400 Use latest EMR release label in `example_emr` (#40702) --- tests/system/providers/amazon/aws/example_emr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/providers/amazon/aws/example_emr.py b/tests/system/providers/amazon/aws/example_emr.py index 3efac9eda2..507ff8588a 100644 --- a/tests/system/providers/amazon/aws/example_emr.py +++ b/tests/system/providers/amazon/aws/example_emr.py @@ -71,7 +71,7 @@ SPARK_STEPS = [ JOB_FLOW_OVERRIDES: dict[str, Any] = { "Name": "PiCalc", -"ReleaseLabel": "emr-6.7.0", +"ReleaseLabel": "emr-7.1.0", "Applications": [{"Name": "Spark"}], "Instances": { "InstanceGroups": [
(airflow) branch main updated (c25858cb30 -> aca140a2c1)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from c25858cb30 feat(slack): add unfurl options to slack notifier (#40694) add aca140a2c1 Send important executor logs to task logs (#40468) No new revisions were added by this update. Summary of changes: airflow/executors/base_executor.py | 20 - .../amazon/aws/executors/ecs/ecs_executor.py | 37 ++--- airflow/utils/log/file_task_handler.py | 9 ++--- airflow/utils/log/task_context_logger.py | 28 - .../amazon/aws/executors/ecs/test_ecs_executor.py | 47 +- tests/test_utils/mock_executor.py | 4 +- tests/utils/log/test_task_context_logger.py| 19 + 7 files changed, 110 insertions(+), 54 deletions(-)
(airflow) branch main updated: Make `AwsAuthManager` compatible with only Airflow >= 2.9 (#40690)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 224cb75be1 Make `AwsAuthManager` compatible with only Airflow >= 2.9 (#40690) 224cb75be1 is described below commit 224cb75be10f71e34b4a81a9f4b7ed43f2f25db6 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Wed Jul 10 12:01:23 2024 -0400 Make `AwsAuthManager` compatible with only Airflow >= 2.9 (#40690) --- airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py | 10 ++ .../providers/amazon/aws/auth_manager/test_aws_auth_manager.py | 4 +++- tests/providers/amazon/aws/auth_manager/views/test_auth.py | 4 ++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index f94e4de691..5660ec5d87 100644 --- a/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -81,6 +81,16 @@ class AwsAuthManager(BaseAuthManager): """ def __init__(self, appbuilder: AirflowAppBuilder) -> None: +from packaging.version import Version + +from airflow.version import version + +# TODO: remove this if block when min_airflow_version is set to higher than 2.9.0 +if Version(version) < Version("2.9"): +raise AirflowOptionalProviderFeatureException( +"``AwsAuthManager`` is compatible with Airflow versions >= 2.9." +) + super().__init__(appbuilder) self._check_avp_schema_version() diff --git a/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py b/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py index 03684e6336..a4a9472000 100644 --- a/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py +++ b/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py @@ -23,7 +23,7 @@ import pytest from flask import Flask, session from flask_appbuilder.menu import MenuItem -from tests.test_utils.compat import AIRFLOW_V_2_8_PLUS +from tests.test_utils.compat import AIRFLOW_V_2_8_PLUS, AIRFLOW_V_2_9_PLUS try: from airflow.auth.managers.models.resource_details import ( @@ -66,6 +66,8 @@ from tests.test_utils.www import check_content_in_response if TYPE_CHECKING: from airflow.auth.managers.base_auth_manager import ResourceMethod +pytestmark = pytest.mark.skipif(not AIRFLOW_V_2_9_PLUS, reason="Test requires Airflow 2.9+") + mock = Mock() SAML_METADATA_PARSED = { diff --git a/tests/providers/amazon/aws/auth_manager/views/test_auth.py b/tests/providers/amazon/aws/auth_manager/views/test_auth.py index 7474d74727..7db5ad7910 100644 --- a/tests/providers/amazon/aws/auth_manager/views/test_auth.py +++ b/tests/providers/amazon/aws/auth_manager/views/test_auth.py @@ -23,12 +23,12 @@ from flask import session, url_for from airflow.exceptions import AirflowException from airflow.www import app as application -from tests.test_utils.compat import AIRFLOW_V_2_8_PLUS +from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS from tests.test_utils.config import conf_vars pytest.importorskip("onelogin") -pytestmark = pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Test requires Airflow 2.8+") +pytestmark = pytest.mark.skipif(not AIRFLOW_V_2_9_PLUS, reason="Test requires Airflow 2.9+") SAML_METADATA_URL = "/saml/metadata"
(airflow) branch main updated (728ee2ea08 -> 0d0cfec70d)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 728ee2ea08 fix two typos (#40670) add 0d0cfec70d Add serialization opt to s3 operator (#40659) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/operators/s3.py| 12 ++- tests/providers/amazon/aws/operators/test_s3.py | 42 - 2 files changed, 52 insertions(+), 2 deletions(-)
(airflow) branch main updated: Fix Glue system test (#40612)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 9d92feda7f Fix Glue system test (#40612) 9d92feda7f is described below commit 9d92feda7fae0f014743153ce69656eb339fc420 Author: D. Ferruzzi AuthorDate: Thu Jul 4 14:30:28 2024 -0700 Fix Glue system test (#40612) The new Sensor was not added to the chain(), causing it to fire off before the database it is supposed to watch is even created. --- tests/system/providers/amazon/aws/example_glue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system/providers/amazon/aws/example_glue.py b/tests/system/providers/amazon/aws/example_glue.py index fb44ad248f..c16aaf8677 100644 --- a/tests/system/providers/amazon/aws/example_glue.py +++ b/tests/system/providers/amazon/aws/example_glue.py @@ -207,6 +207,7 @@ with DAG( # TEST BODY crawl_s3, wait_for_crawl, +wait_for_catalog_partition, submit_glue_job, wait_for_job, # TEST TEARDOWN
(airflow) branch main updated (b8aab5cf63 -> 015ac89689)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from b8aab5cf63 Use base aws classes in AWS Glue Crawlers Operators/Sensors/Triggers (#40504) add 015ac89689 Use base aws classes in AWS Glue Data Catalog Sensors (#40492) No new revisions were added by this update. Summary of changes: .../amazon/aws/sensors/glue_catalog_partition.py | 42 +- airflow/providers/amazon/aws/triggers/glue.py | 18 -- .../operators/glue.rst | 14 tests/always/test_project_structure.py | 2 -- .../aws/sensors/test_glue_catalog_partition.py | 25 + tests/providers/amazon/aws/triggers/test_glue.py | 22 tests/system/providers/amazon/aws/example_glue.py | 15 ++-- 7 files changed, 113 insertions(+), 25 deletions(-)
(airflow) branch main updated (0fae73dc45 -> b8aab5cf63)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 0fae73dc45 Update docs for RC2 openlineage provider (#40551) add b8aab5cf63 Use base aws classes in AWS Glue Crawlers Operators/Sensors/Triggers (#40504) No new revisions were added by this update. Summary of changes: .../providers/amazon/aws/operators/glue_crawler.py | 41 .../providers/amazon/aws/sensors/glue_crawler.py | 28 +++--- .../providers/amazon/aws/triggers/glue_crawler.py | 9 +- .../operators/glue.rst | 5 + .../amazon/aws/operators/test_glue_crawler.py | 105 ++--- .../amazon/aws/sensors/test_glue_crawler.py| 25 + .../amazon/aws/triggers/test_glue_crawler.py | 24 - 7 files changed, 189 insertions(+), 48 deletions(-)
(airflow) branch main updated: Adding cluster to ecs trigger event to avoid defer error (#40482)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 6c12744dd8 Adding cluster to ecs trigger event to avoid defer error (#40482) 6c12744dd8 is described below commit 6c12744dd8656e1d8b066c7edc8f0ab60ac124d2 Author: ellisms <114107920+elli...@users.noreply.github.com> AuthorDate: Fri Jun 28 11:49:20 2024 -0400 Adding cluster to ecs trigger event to avoid defer error (#40482) * Adding cluster to ecs trigger event to avoid defer error * restore cluster after defer --- airflow/providers/amazon/aws/operators/ecs.py| 1 + airflow/providers/amazon/aws/triggers/ecs.py | 4 +++- tests/providers/amazon/aws/operators/test_ecs.py | 4 ++-- tests/providers/amazon/aws/triggers/test_ecs.py | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py index 294291dc0a..1cd8685cf2 100644 --- a/airflow/providers/amazon/aws/operators/ecs.py +++ b/airflow/providers/amazon/aws/operators/ecs.py @@ -586,6 +586,7 @@ class EcsRunTaskOperator(EcsBaseOperator): if event["status"] != "success": raise AirflowException(f"Error in task execution: {event}") self.arn = event["task_arn"] # restore arn to its updated value, needed for next steps +self.cluster = event["cluster"] self._after_execution() if self._aws_logs_enabled(): # same behavior as non-deferrable mode, return last line of logs of the task. diff --git a/airflow/providers/amazon/aws/triggers/ecs.py b/airflow/providers/amazon/aws/triggers/ecs.py index 1177aa657a..dd86899f22 100644 --- a/airflow/providers/amazon/aws/triggers/ecs.py +++ b/airflow/providers/amazon/aws/triggers/ecs.py @@ -179,7 +179,9 @@ class TaskDoneTrigger(BaseTrigger): cluster=self.cluster, tasks=[self.task_arn], WaiterConfig={"MaxAttempts": 1} ) # we reach this point only if the waiter met a success criteria -yield TriggerEvent({"status": "success", "task_arn": self.task_arn}) +yield TriggerEvent( +{"status": "success", "task_arn": self.task_arn, "cluster": self.cluster} +) return except WaiterError as error: if "terminal failure" in str(error): diff --git a/tests/providers/amazon/aws/operators/test_ecs.py b/tests/providers/amazon/aws/operators/test_ecs.py index 9bccd22f9e..a6915214a0 100644 --- a/tests/providers/amazon/aws/operators/test_ecs.py +++ b/tests/providers/amazon/aws/operators/test_ecs.py @@ -674,13 +674,13 @@ class TestEcsRunTaskOperator(EcsBaseTestCase): @mock.patch.object(EcsRunTaskOperator, "client", new_callable=PropertyMock) def test_execute_complete(self, client_mock): -event = {"status": "success", "task_arn": "my_arn"} +event = {"status": "success", "task_arn": "my_arn", "cluster": "test_cluster"} self.ecs.reattach = True self.ecs.execute_complete(None, event) # task gets described to assert its success -client_mock().describe_tasks.assert_called_once_with(cluster="c", tasks=["my_arn"]) + client_mock().describe_tasks.assert_called_once_with(cluster="test_cluster", tasks=["my_arn"]) @pytest.mark.db_test @pytest.mark.parametrize( diff --git a/tests/providers/amazon/aws/triggers/test_ecs.py b/tests/providers/amazon/aws/triggers/test_ecs.py index 27c815be82..a5c11d9298 100644 --- a/tests/providers/amazon/aws/triggers/test_ecs.py +++ b/tests/providers/amazon/aws/triggers/test_ecs.py @@ -92,3 +92,4 @@ class TestTaskDoneTrigger: assert response.payload["status"] == "success" assert response.payload["task_arn"] == "my_task_arn" +assert response.payload["cluster"] == "cluster"
(airflow) branch main updated: Fix typos and grammatical errors in INSTALL documentation (#40417)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 5f2da71bf8 Fix typos and grammatical errors in INSTALL documentation (#40417) 5f2da71bf8 is described below commit 5f2da71bf86f1f73acd802b66b979e77f53c8715 Author: Yoonji Heo <11150+myeu...@users.noreply.github.com> AuthorDate: Wed Jun 26 03:02:45 2024 +0900 Fix typos and grammatical errors in INSTALL documentation (#40417) Corrected several typographical and grammatical errors in the INSTALL documentation. These changes improve the clarity and readability of the instructions. Specific changes include correcting "CPyhon" to "CPython" and standardizing the capitalization of "Airflow". --- INSTALL | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/INSTALL b/INSTALL index 1ed3e7c5c3..78506f9a57 100644 --- a/INSTALL +++ b/INSTALL @@ -3,8 +3,8 @@ INSTALL / BUILD instructions for Apache Airflow Basic installation of Airflow from sources and development environment setup -This is a generic installation method that requires minimum standard tools to develop airflow and -test it in a local virtual environment (using standard CPyhon installation and `pip`). +This is a generic installation method that requires minimum standard tools to develop Airflow and +test it in a local virtual environment (using standard CPython installation and `pip`). Depending on your system, you might need different prerequisites, but the following systems/prerequisites are known to work: @@ -61,7 +61,7 @@ Once you have a suitable Python version installed, you can create a virtualenv a ## Installing Airflow locally -Installing airflow locally can be done using pip - note that this will install "development" version of +Installing Airflow locally can be done using pip - note that this will install "development" version of Airflow, where all providers are installed from local sources (if available), not from `pypi`. It will also not include pre-installed providers installed from PyPI. If you install from sources of just Airflow, you need to install separately each provider you want to develop. If you install
(airflow) branch main updated (3f3bb463ae -> d5fb711ae0)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 3f3bb463ae Update backfill tests to use executor loader (#40285) add d5fb711ae0 Add Amazon Comprehend Document Classifier (#40287) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/hooks/comprehend.py | 33 +++ .../providers/amazon/aws/operators/comprehend.py | 149 - airflow/providers/amazon/aws/sensors/comprehend.py | 113 +- .../providers/amazon/aws/triggers/comprehend.py| 36 +++ .../providers/amazon/aws/waiters/comprehend.json | 55 + .../operators/comprehend.rst | 29 +++ .../providers/amazon/aws/hooks/test_comprehend.py | 104 + .../amazon/aws/operators/test_comprehend.py| 98 + .../amazon/aws/sensors/test_comprehend.py | 101 + .../amazon/aws/triggers/test_comprehend.py | 40 +++- .../amazon/aws/waiters/test_comprehend.py | 33 +++ .../aws/example_comprehend_document_classifier.py | 242 + 12 files changed, 1030 insertions(+), 3 deletions(-) create mode 100644 tests/system/providers/amazon/aws/example_comprehend_document_classifier.py
(airflow) branch main updated (1f7be6f63e -> f5d2745909)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 1f7be6f63e Change the pre-commit installing command to use pipx instead of pyenv (#40266) add f5d2745909 Resolve appflow deprecations in tests (#40298) No new revisions were added by this update. Summary of changes: tests/deprecations_ignore.yml| 2 -- tests/providers/amazon/aws/operators/test_appflow.py | 10 -- 2 files changed, 8 insertions(+), 4 deletions(-)
(airflow) branch main updated (c2a93eabd1 -> 835f28c8b9)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from c2a93eabd1 Update AWS Executor documentation (#39920) add 835f28c8b9 Lazy match escaped quotes in `RedshiftToS3Operator` (#40206) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/transfers/redshift_to_s3.py| 2 +- tests/providers/amazon/aws/transfers/test_redshift_to_s3.py | 6 ++ 2 files changed, 7 insertions(+), 1 deletion(-)
(airflow) branch main updated: Update AWS Executor documentation (#39920)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c2a93eabd1 Update AWS Executor documentation (#39920) c2a93eabd1 is described below commit c2a93eabd1cfe60484f6c74c1feef65731bea8bf Author: Maham Ali <50532268+maha...@users.noreply.github.com> AuthorDate: Wed Jun 12 12:51:28 2024 -0700 Update AWS Executor documentation (#39920) --- .../executors/batch-executor.rst | 2 +- .../executors/ecs-executor.rst | 284 ++--- .../executors/general.rst | 11 + 3 files changed, 33 insertions(+), 264 deletions(-) diff --git a/docs/apache-airflow-providers-amazon/executors/batch-executor.rst b/docs/apache-airflow-providers-amazon/executors/batch-executor.rst index d32b6abfd9..a702e69fda 100644 --- a/docs/apache-airflow-providers-amazon/executors/batch-executor.rst +++ b/docs/apache-airflow-providers-amazon/executors/batch-executor.rst @@ -19,7 +19,7 @@ .. warning:: The Batch Executor is alpha/experimental at the moment and may be subject to change without warning. .. |executorName| replace:: Batch -.. |dockerfileLink| replace:: `here <https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/executors/batch/Dockerfile>`__ +.. |dockerfileLink| replace:: `here <https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/executors/Dockerfile>`__ .. |configKwargs| replace:: SUBMIT_JOB_KWARGS == diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst index d8d3764f5e..d4289e629a 100644 --- a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst +++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst @@ -19,6 +19,9 @@ .. warning:: The ECS Executor is alpha/experimental at the moment and may be subject to change without warning. +.. |executorName| replace:: ECS +.. |dockerfileLink| replace:: `here <https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/executors/Dockerfile>`__ +.. |configKwargs| replace:: SUBMIT_JOB_KWARGS AWS ECS Executor @@ -121,32 +124,10 @@ provider package. .. _dockerfile_for_ecs_executor: -Dockerfile for ECS Executor +.. include:: general.rst + :start-after: .. BEGIN DOCKERFILE + :end-before: .. END DOCKERFILE -An example Dockerfile can be found `here <https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/executors/ecs/Dockerfile>`__, it creates an -image that can be used on an ECS container to run Airflow tasks using -the AWS ECS Executor in Apache Airflow. The image supports AWS CLI/API -integration, allowing you to interact with AWS services within your -Airflow environment. It also includes options to load DAGs (Directed -Acyclic Graphs) from either an S3 bucket or a local folder. - -Download this image to use for the docker build commands below or create -your own image if you prefer. - -Prerequisites -~ - -Docker must be installed on your system. Instructions for installing -Docker can be found `here <https://docs.docker.com/get-docker/>`__. - -Building an Image -~ - -The `AWS CLI <https://aws.amazon.com/cli/>`__ will be installed within the -image, and there are multiple ways to pass AWS authentication -information to the container and thus multiple ways to build the image. -This guide will cover 2 methods. The most secure method is to use IAM roles. When creating an ECS Task Definition, you are able to select a Task Role and a Task Execution @@ -180,169 +161,15 @@ below: When creating the Task Definition for the ECS cluster (see the :ref:`setup guide ` for more details), select the appropriate newly created Task Role and Task Execution role for the Task Definition. -Then you can build your image by ``cd``-ing to the directory with the Dockerfile and running: - -.. code-block:: bash - - docker build -t my-airflow-image \ ---build-arg aws_default_region=YOUR_DEFAULT_REGION . - - -The second method is to use the build-time arguments -(``aws_access_key_id``, ``aws_secret_access_key``, -``aws_default_region``, and ``aws_session_token``). - -Note: This method is not recommended for use in production environments, -because user credentials are stored in the container, which may be a -security vulnerability. - -To pass AWS authentication information using these arguments, use the -``--build-arg`` option during the Docker build process. For example: - -.. code-block:: bash - - docker build -t my-airflow-image \ ---build-arg aws_access_key_id=YOUR_ACCESS_KEY \ ---build-arg aws_secret_access_key=YOUR_SECRET_KEY \ ---build-arg aws_default_reg
(airflow) branch main updated: Add link to Google Providers Package System Tests Public Dashboard (#40102)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 52f858c362 Add link to Google Providers Package System Tests Public Dashboard (#40102) 52f858c362 is described below commit 52f858c36278d0f45ece77721ed36107e298a185 Author: Freddy Demiane AuthorDate: Tue Jun 11 21:37:47 2024 +0200 Add link to Google Providers Package System Tests Public Dashboard (#40102) --- tests/system/providers/google/README.md | 4 1 file changed, 4 insertions(+) diff --git a/tests/system/providers/google/README.md b/tests/system/providers/google/README.md index da8709cb90..14e0a6a320 100644 --- a/tests/system/providers/google/README.md +++ b/tests/system/providers/google/README.md @@ -97,3 +97,7 @@ Keep in mind that some additional commands may be required. Some tests may require extra setup. If this is the case, the steps should be documented inside the docstring of related test file. + +## Dashboard + +To check the status of the system tests against the head revision of Apache Airflow, please refer to this [dashboard](https://storage.googleapis.com/providers-dashboard-html/dashboard.html).
(airflow) branch main updated: Aip-61: Add validation on task `executor` field (#40030)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new d504bfa5d1 Aip-61: Add validation on task `executor` field (#40030) d504bfa5d1 is described below commit d504bfa5d148e0d9470375c1c9d35c117deebc31 Author: Syed Hussain <103602455+syeda...@users.noreply.github.com> AuthorDate: Mon Jun 10 10:18:55 2024 -0700 Aip-61: Add validation on task `executor` field (#40030) --- airflow/executors/executor_loader.py | 4 ++-- airflow/models/dag.py| 14 ++ tests/models/test_dag.py | 23 +++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index b8b886954b..d433ef183e 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -25,7 +25,7 @@ from contextlib import suppress from typing import TYPE_CHECKING from airflow.api_internal.internal_api_call import InternalApiConfig -from airflow.exceptions import AirflowConfigException, AirflowException +from airflow.exceptions import AirflowConfigException from airflow.executors.executor_constants import ( CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR, @@ -206,7 +206,7 @@ class ExecutorLoader: elif executor_name := _classname_to_executors.get(executor_name_str): return executor_name else: -raise AirflowException(f"Unknown executor being loaded: {executor_name_str}") +raise ValueError(f"Unknown executor being loaded: {executor_name_str}") @classmethod def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor: diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b91c08317f..1bf812abda 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -94,6 +94,7 @@ from airflow.exceptions import ( TaskDeferred, TaskNotFound, ) +from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.job import run_job from airflow.models.abstractoperator import AbstractOperator, TaskStateChangeCallback from airflow.models.base import Base, StringID @@ -800,10 +801,23 @@ class DAG(LoggingMixin): f"inconsistent schedule: timetable {self.timetable.summary!r} " f"does not match schedule_interval {self.schedule_interval!r}", ) +self.validate_executor_field() self.validate_schedule_and_params() self.timetable.validate() self.validate_setup_teardown() +def validate_executor_field(self): +for task in self.tasks: +if task.executor: +try: +ExecutorLoader.lookup_executor_name_by_str(task.executor) +except ValueError: +raise ValueError( +f"The specified executor {task.executor} for task {task.task_id} is not " +"configured. Review the core.executors Airflow configuration to add it or " +"update the executor configuration for this task." +) + def validate_setup_teardown(self): """ Validate that setup and teardown tasks are configured properly. diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 9207b3557c..6d931ffbdc 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -59,6 +59,7 @@ from airflow.models.dag import ( DagModel, DagOwnerAttributes, DagTag, +ExecutorLoader, dag as dag_decorator, get_dataset_triggered_next_run_info, ) @@ -2761,6 +2762,28 @@ my_postgres_conn: dag.access_control = outdated_permissions assert dag.access_control == updated_permissions +def test_validate_executor_field_executor_not_configured(self): +dag = DAG( +"test-dag", +schedule=None, +) + +EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor") +with pytest.raises( +ValueError, match="The specified executor test.custom.executor for task t1 is not configured" +): +dag.validate() + +def test_validate_executor_field(self): +with patch.object(ExecutorLoader, "lookup_executor_name_by_str"): +dag = DAG( +"test-dag", +schedule=None, +) + +EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor") +dag.validate() + def test_validate_params_on_trigger_dag(self): dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) with pytest.raises(ParamValidationError, match="No value passed and Param has no default value"):
(airflow) branch main updated: Resolve aws provider deprecations in tests (#40123)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 1a7a67f134 Resolve aws provider deprecations in tests (#40123) 1a7a67f134 is described below commit 1a7a67f134e6ed8177a074d73168a6853c04641f Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Mon Jun 10 20:21:55 2024 +0530 Resolve aws provider deprecations in tests (#40123) --- tests/deprecations_ignore.yml | 4 -- .../amazon/aws/operators/test_base_aws.py | 5 +- tests/providers/amazon/aws/operators/test_batch.py | 36 +++ tests/providers/amazon/aws/operators/test_eks.py | 53 -- .../providers/amazon/aws/sensors/test_base_aws.py | 5 +- 5 files changed, 74 insertions(+), 29 deletions(-) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index 4eb460674c..3544bbc786 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -99,14 +99,10 @@ - tests/providers/amazon/aws/deferrable/hooks/test_redshift_cluster.py::TestRedshiftAsyncHook::test_resume_cluster_exception - tests/providers/amazon/aws/operators/test_appflow.py::test_base_aws_op_attributes - tests/providers/amazon/aws/operators/test_appflow.py::test_run -- tests/providers/amazon/aws/operators/test_base_aws.py::TestAwsBaseOperator::test_conflicting_region_name -- tests/providers/amazon/aws/operators/test_batch.py::TestBatchOperator::test_override_not_sent_if_not_set -- tests/providers/amazon/aws/operators/test_eks.py::TestEksPodOperator::test_on_finish_action_handler - tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_conn_value_broken_field_mode - tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_conn_value_broken_field_mode_extra_words_added - tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_connection_broken_field_mode_extra_allows_nested_json - tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_connection_broken_field_mode_url_encoding -- tests/providers/amazon/aws/sensors/test_base_aws.py::TestAwsBaseSensor::test_conflicting_region_name - tests/providers/amazon/aws/triggers/test_redshift_cluster.py::TestRedshiftClusterTrigger::test_redshift_cluster_sensor_trigger_exception - tests/providers/amazon/aws/triggers/test_redshift_cluster.py::TestRedshiftClusterTrigger::test_redshift_cluster_sensor_trigger_resuming_status - tests/providers/amazon/aws/triggers/test_redshift_cluster.py::TestRedshiftClusterTrigger::test_redshift_cluster_sensor_trigger_success diff --git a/tests/providers/amazon/aws/operators/test_base_aws.py b/tests/providers/amazon/aws/operators/test_base_aws.py index 687a5ba3c0..ab5f07e429 100644 --- a/tests/providers/amazon/aws/operators/test_base_aws.py +++ b/tests/providers/amazon/aws/operators/test_base_aws.py @@ -144,7 +144,10 @@ class TestAwsBaseOperator: def test_conflicting_region_name(self): error_match = r"Conflicting `region_name` provided, region_name='us-west-1', region='eu-west-1'" -with pytest.raises(ValueError, match=error_match): +with pytest.raises(ValueError, match=error_match), pytest.warns( +AirflowProviderDeprecationWarning, +match="`region` is deprecated and will be removed in the future. Please use `region_name` instead.", +): FakeS3Operator( task_id="fake-task-id", aws_conn_id=TEST_CONN, diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 27f86e279c..137044a212 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -307,16 +307,32 @@ class TestBatchOperator: in the API call (which would create a validation error from boto) """ override_arg = {override: {"a": "a"}} -batch = BatchOperator( -task_id="task", -job_name=JOB_NAME, -job_queue="queue", -job_definition="hello-world", -**override_arg, -# setting those to bypass code that is not relevant here -do_xcom_push=False, -wait_for_completion=False, -) +if override == "overrides": +with pytest.warns( +AirflowProviderDeprecationWarning, +match="Parameter `overrides` is deprecated, Please use `container_overrides` instead.", +): +batch = BatchOperator( +
(airflow) branch main updated: Resolve pagerduty deprecations in tests (#39945)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 21dae6ee39 Resolve pagerduty deprecations in tests (#39945) 21dae6ee39 is described below commit 21dae6ee39e7d7fc09a7f1495c8c243ee9d09b49 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Tue Jun 4 23:18:19 2024 +0530 Resolve pagerduty deprecations in tests (#39945) --- tests/deprecations_ignore.yml | 3 --- tests/providers/pagerduty/hooks/test_pagerduty.py | 31 ++ .../pagerduty/hooks/test_pagerduty_events.py | 14 ++ 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index b75db24f60..2752ccd85e 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -466,9 +466,6 @@ - tests/providers/oracle/hooks/test_oracle.py::TestOracleHookConn::test_set_thick_mode_params - tests/providers/oracle/hooks/test_oracle.py::TestOracleHookConn::test_thick_mode_defaults_to_false - tests/providers/oracle/hooks/test_oracle.py::TestOracleHookConn::test_thick_mode_dirs_defaults -- tests/providers/pagerduty/hooks/test_pagerduty.py::TestPagerdutyHook::test_create_event -- tests/providers/pagerduty/hooks/test_pagerduty.py::TestPagerdutyHook::test_create_event_override -- tests/providers/pagerduty/hooks/test_pagerduty_events.py::TestPagerdutyEventsHook::test_create_event - tests/providers/postgres/hooks/test_postgres.py::TestPostgresHookConn::test_schema_kwarg_database_kwarg_compatibility - tests/providers/postgres/operators/test_postgres.py::test_parameters_are_templatized - tests/providers/postgres/operators/test_postgres.py::TestPostgres::test_overwrite_database diff --git a/tests/providers/pagerduty/hooks/test_pagerduty.py b/tests/providers/pagerduty/hooks/test_pagerduty.py index f6df40143a..f4ea355665 100644 --- a/tests/providers/pagerduty/hooks/test_pagerduty.py +++ b/tests/providers/pagerduty/hooks/test_pagerduty.py @@ -21,6 +21,7 @@ from unittest import mock import pytest +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import Connection from airflow.providers.pagerduty.hooks.pagerduty import PagerdutyHook from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsHook @@ -84,11 +85,15 @@ class TestPagerdutyHook: def test_create_event(self, events_hook_create_event, events_hook_init): events_hook_init.return_value = None hook = PagerdutyHook(pagerduty_conn_id=DEFAULT_CONN_ID) -hook.create_event( -summary="test", -source="airflow_test", -severity="error", -) +with pytest.warns( +AirflowProviderDeprecationWarning, +match="This method will be deprecated. Please use the `airflow.providers.pagerduty.hooks.PagerdutyEventsHook` to interact with the Events API", +): +hook.create_event( +summary="test", +source="airflow_test", +severity="error", +) events_hook_init.assert_called_with(integration_key="integration_key") events_hook_create_event.assert_called_with( summary="test", @@ -109,10 +114,14 @@ class TestPagerdutyHook: def test_create_event_override(self, events_hook_init): events_hook_init.return_value = None hook = PagerdutyHook(pagerduty_conn_id=DEFAULT_CONN_ID) -hook.create_event( -routing_key="different_key", -summary="test", -source="airflow_test", -severity="error", -) +with pytest.warns( +AirflowProviderDeprecationWarning, +match="This method will be deprecated. Please use the `airflow.providers.pagerduty.hooks.PagerdutyEventsHook` to interact with the Events API", +): +hook.create_event( +routing_key="different_key", +summary="test", +source="airflow_test", +severity="error", +) events_hook_init.assert_called_with(integration_key="different_key") diff --git a/tests/providers/pagerduty/hooks/test_pagerduty_events.py b/tests/providers/pagerduty/hooks/test_pagerduty_events.py index 63c60efc5d..3a5df3b616 100644 --- a/tests/providers/pagerduty/hooks/test_pagerduty_events.py +++ b/tests/providers/pagerduty/hooks/test_pagerduty_events.py @@ -51,11 +51,15 @@ class TestPagerdutyEventsHook: "dedup_key": "samplekeyhere", } r
(airflow) branch main updated: Adding Glue Data Quality Rule Recommendation Run (#40014)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new ea682382bc Adding Glue Data Quality Rule Recommendation Run (#40014) ea682382bc is described below commit ea682382bc3570a820d400994114e8b0060add66 Author: GPK AuthorDate: Tue Jun 4 18:45:26 2024 +0100 Adding Glue Data Quality Rule Recommendation Run (#40014) --- airflow/providers/amazon/aws/hooks/glue.py | 23 airflow/providers/amazon/aws/operators/glue.py | 146 + airflow/providers/amazon/aws/sensors/glue.py | 125 +- airflow/providers/amazon/aws/triggers/glue.py | 35 + airflow/providers/amazon/aws/waiters/glue.json | 49 +++ .../operators/glue.rst | 28 tests/providers/amazon/aws/hooks/test_glue.py | 21 +++ tests/providers/amazon/aws/operators/test_glue.py | 108 +++ .../amazon/aws/sensors/test_glue_data_quality.py | 138 ++- tests/providers/amazon/aws/triggers/test_glue.py | 28 tests/providers/amazon/aws/waiters/test_glue.py| 38 +- .../amazon/aws/example_glue_data_quality.py| 2 +- ...ample_glue_data_quality_with_recommendation.py} | 57 13 files changed, 764 insertions(+), 34 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index f81dde2d11..08a96836da 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -530,3 +530,26 @@ class GlueDataQualityHook(AwsBaseHook): raise AirflowException( "AWS Glue data quality ruleset evaluation run failed for one or more rules" ) + +def log_recommendation_results(self, run_id: str) -> None: +""" +Print the outcome of recommendation run, recommendation run generates multiple rules against a data source (Glue table) in Data Quality Definition Language (DQDL) format. + +Rules = [ +IsComplete "NAME", +ColumnLength "EMP_ID" between 1 and 12, +IsUnique "EMP_ID", +ColumnValues "INCOME" > 5 +] +""" +result = self.conn.get_data_quality_rule_recommendation_run(RunId=run_id) + +if result.get("RecommendedRuleset"): +self.log.info( +"AWS Glue data quality recommended rules for DatabaseName: %s TableName: %s", +result["DataSource"]["GlueTable"]["DatabaseName"], +result["DataSource"]["GlueTable"]["TableName"], +) +self.log.info(result["RecommendedRuleset"]) +else: +self.log.info("AWS Glue data quality, no recommended rules available for RunId: %s", run_id) diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py index cd681147bb..90c8f5b30f 100644 --- a/airflow/providers/amazon/aws/operators/glue.py +++ b/airflow/providers/amazon/aws/operators/glue.py @@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator from airflow.providers.amazon.aws.triggers.glue import ( +GlueDataQualityRuleRecommendationRunCompleteTrigger, GlueDataQualityRuleSetEvaluationRunCompleteTrigger, GlueJobCompleteTrigger, ) @@ -499,3 +500,148 @@ class GlueDataQualityRuleSetEvaluationRunOperator(AwsBaseOperator[GlueDataQualit ) return event["evaluation_run_id"] + + +class GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQualityHook]): +""" +Starts a recommendation run that is used to generate rules, Glue Data Quality analyzes the data and comes up with recommendations for a potential ruleset. + +Recommendation runs are automatically deleted after 90 days. + +.. seealso:: +For more information on how to use this operator, take a look at the guide: +:ref:`howto/operator:GlueDataQualityRuleRecommendationRunOperator` + +:param datasource: The data source (Glue table) associated with this run. (templated) +:param role: IAM role supplied for job execution. (templated) +:param number_of_workers: The number of G.1X workers to be used in the run. (default: 5) +:param timeout: The timeout for a run in minutes. This is the maximum time that a run can consume resources +before it is terminated and enters TIMEOUT status. (default: 2,880) +:param show_results: Displays the re
(airflow) branch main updated: Update index.rst (#40040)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 32cf8cf37d Update index.rst (#40040) 32cf8cf37d is described below commit 32cf8cf37d57b2d414a8c1dc2ac73449a4cbacd8 Author: bangjiehan AuthorDate: Wed Jun 5 01:38:03 2024 +0800 Update index.rst (#40040) Remove the word "docker-compose" in PyPI section. --- docs/apache-airflow/installation/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/installation/index.rst b/docs/apache-airflow/installation/index.rst index 9f9f76cd27..d177ce3a79 100644 --- a/docs/apache-airflow/installation/index.rst +++ b/docs/apache-airflow/installation/index.rst @@ -141,7 +141,7 @@ More details: :doc:`/installation/installing-from-pypi` diagnose and solve. * You have :doc:`/start` where you can see an example of Quick Start with running Airflow locally which you can use to start Airflow quickly for local testing and development. - However, this is just for inspiration. Do not expect this docker-compose is ready for production installation, + However, this is just for inspiration. Do not expect :doc:`/start` is ready for production installation, you need to build your own production-ready deployment if you follow this approach. **Where to ask for help**
(airflow) branch main updated (914bccc387 -> 1ec3b39f28)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 914bccc387 Improve trigger UI for string array format validation (#39993) add 1ec3b39f28 Resolve www views deprecations in tests (#40009) No new revisions were added by this update. Summary of changes: tests/deprecations_ignore.yml | 36 -- tests/www/test_utils.py| 4 ++- tests/www/views/test_views_cluster_activity.py | 6 +++-- tests/www/views/test_views_grid.py | 3 ++- tests/www/views/test_views_tasks.py| 2 +- 5 files changed, 10 insertions(+), 41 deletions(-)
(airflow) branch main updated (4849fefb50 -> dbb28d801c)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 4849fefb50 Resolve aws provider deprecations in tests (#40026) add dbb28d801c Resolve aws ecs deprecations in tests (#40016) No new revisions were added by this update. Summary of changes: tests/deprecations_ignore.yml| 30 tests/providers/amazon/aws/operators/test_ecs.py | 10 2 files changed, 4 insertions(+), 36 deletions(-)
(airflow) branch main updated (19c145c9ef -> 4849fefb50)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 19c145c9ef Resolve aws emr deprecations in tests (#40020) add 4849fefb50 Resolve aws provider deprecations in tests (#40026) No new revisions were added by this update. Summary of changes: tests/deprecations_ignore.yml | 6 - tests/providers/amazon/aws/hooks/test_sagemaker.py | 28 +++--- tests/providers/amazon/aws/sensors/test_ecs.py | 6 ++--- .../amazon/aws/utils/test_connection_wrapper.py| 13 +- 4 files changed, 29 insertions(+), 24 deletions(-)
(airflow) branch main updated (b5bb039811 -> 19c145c9ef)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from b5bb039811 Fix bug that makes `AirflowSecurityManagerV2` leave transactions in the `idle in transaction` state (#39935) add 19c145c9ef Resolve aws emr deprecations in tests (#40020) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/operators/emr.py | 4 +- tests/deprecations_ignore.yml | 19 --- .../aws/operators/test_emr_notebook_execution.py | 8 +- .../amazon/aws/operators/test_emr_serverless.py| 164 +++-- 4 files changed, 121 insertions(+), 74 deletions(-)
(airflow) branch main updated (651a6d6a68 -> b5bb039811)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 651a6d6a68 standardizes template fields for `BaseSQLOperator` and adds `database` as a templated field (#39826) add b5bb039811 Fix bug that makes `AirflowSecurityManagerV2` leave transactions in the `idle in transaction` state (#39935) No new revisions were added by this update. Summary of changes: airflow/www/security_manager.py| 10 -- tests/www/test_security_manager.py | 31 +++ 2 files changed, 35 insertions(+), 6 deletions(-)
(airflow) branch main updated: Resolve triggerer job logging deprecations in tests (#39962)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 936c3892af Resolve triggerer job logging deprecations in tests (#39962) 936c3892af is described below commit 936c3892afe63a9926e42c8dc217979a4b6af0d7 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Fri May 31 22:26:17 2024 +0530 Resolve triggerer job logging deprecations in tests (#39962) --- tests/deprecations_ignore.yml| 3 --- tests/jobs/test_triggerer_job_logging.py | 3 --- 2 files changed, 6 deletions(-) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index d53d722fd3..6be9074056 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -57,9 +57,6 @@ - tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans - tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear - tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters -- tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_fallback_task -- tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_not_file_task -- tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_old_file_task - tests/models/test_cleartasks.py::TestClearTasks::test_dags_clear - tests/models/test_dag.py::TestDag::test_bulk_write_to_db_interval_save_runtime - tests/models/test_dag.py::TestDag::test_bulk_write_to_db_max_active_runs diff --git a/tests/jobs/test_triggerer_job_logging.py b/tests/jobs/test_triggerer_job_logging.py index 6d6d3863a6..a039c43fb4 100644 --- a/tests/jobs/test_triggerer_job_logging.py +++ b/tests/jobs/test_triggerer_job_logging.py @@ -218,7 +218,6 @@ fallback_task = { "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", "base_log_folder": "~/abc", "s3_log_folder": "s3://abc", -"filename_template": "blah", }, }, "loggers": {"airflow.task": {"handlers": ["task"]}}, @@ -318,7 +317,6 @@ root_not_file_task = { "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", "base_log_folder": "~/abc", "s3_log_folder": "s3://abc", -"filename_template": "blah", }, "trigger": {"class": "logging.Handler"}, }, @@ -379,7 +377,6 @@ root_logger_old_file_task = { "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", "base_log_folder": "~/abc", "s3_log_folder": "s3://abc", -"filename_template": "blah", }, "trigger": { "class": "tests.jobs.test_triggerer_job_logging.OldFileTaskHandler",
(airflow) branch main updated: Adding Amazon Glue Data Quality Service (#39923)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 78523fdbf1 Adding Amazon Glue Data Quality Service (#39923) 78523fdbf1 is described below commit 78523fdbf1b80a7fbc7ec5e7b0b20f6934917898 Author: gopidesupavan AuthorDate: Fri May 31 15:50:22 2024 +0100 Adding Amazon Glue Data Quality Service (#39923) --- airflow/providers/amazon/aws/hooks/glue.py | 100 airflow/providers/amazon/aws/operators/glue.py | 264 - airflow/providers/amazon/aws/sensors/glue.py | 139 ++- airflow/providers/amazon/aws/triggers/glue.py | 43 +++- airflow/providers/amazon/aws/waiters/glue.json | 49 .../operators/glue.rst | 44 docs/spelling_wordlist.txt | 1 + tests/providers/amazon/aws/hooks/test_glue.py | 142 ++- tests/providers/amazon/aws/operators/test_glue.py | 246 ++- .../amazon/aws/sensors/test_glue_data_quality.py | 182 ++ tests/providers/amazon/aws/triggers/test_glue.py | 40 +++- tests/providers/amazon/aws/waiters/test_glue.py| 72 ++ .../amazon/aws/example_glue_data_quality.py| 210 13 files changed, 1519 insertions(+), 13 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index a7edc10d12..f81dde2d11 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -20,6 +20,7 @@ from __future__ import annotations import asyncio import time from functools import cached_property +from typing import Any from botocore.exceptions import ClientError @@ -430,3 +431,102 @@ class GlueJobHook(AwsBaseHook): self.conn.create_job(**config) return self.job_name + + +class GlueDataQualityHook(AwsBaseHook): +""" +Interact with AWS Glue Data Quality. + +Provide thick wrapper around :external+boto3:py:class:`boto3.client("glue") `. + +Additional arguments (such as ``aws_conn_id``) may be specified and +are passed down to the underlying AwsBaseHook. + +.. seealso:: +- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` +""" + +def __init__( +self, +*args, +**kwargs, +): +kwargs["client_type"] = "glue" +super().__init__(*args, **kwargs) + +def has_data_quality_ruleset(self, name: str) -> bool: +try: +self.conn.get_data_quality_ruleset(Name=name) +return True +except self.conn.exceptions.EntityNotFoundException: +return False + +def _log_results(self, result: dict[str, Any]) -> None: +""" +Print the outcome of evaluation run, An evaluation run can involve multiple rulesets evaluated against a data source (Glue table). + +NameDescription Result EvaluatedMetrics EvaluationMessage +Rule_1RowCount between 15 and 60 PASS {'Dataset.*.RowCount': 30.0} NaN +Rule_2IsComplete "marketplace" PASS {'Column.marketplace.Completeness': 1.0} NaN +Rule_3ColumnLength "marketplace" between 1 and 2 FAIL {'Column.marketplace.MaximumLength': 9.0, 'Column.marketplace.MinimumLength': 3.0} Value: 9.0 does not meet the constraint requirement! + +""" +import pandas as pd + +pd.set_option("display.max_rows", None) +pd.set_option("display.max_columns", None) +pd.set_option("display.width", None) +pd.set_option("display.max_colwidth", None) + +self.log.info( +"AWS Glue data quality ruleset evaluation result for RulesetName: %s RulesetEvaluationRunId: %s Score: %s", +result.get("RulesetName"), +result.get("RulesetEvaluationRunId"), +result.get("Score"), +) + +rule_results = result["RuleResults"] +rule_results_df = pd.DataFrame(rule_results) +self.log.info(rule_results_df) + +def get_evaluation_run_results(self, run_id: str) -> dict[str, Any]: +response = self.conn.get_data_quality_ruleset_evaluation_run(RunId=run_id) + +return self.conn.batch_get_data_quality_result(ResultIds=response["ResultIds"]) + +
(airflow) branch main updated: Allow user-specified object attributes to be used in check_fn for S3KeySensor (#39950)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new fcd1a26a9a Allow user-specified object attributes to be used in check_fn for S3KeySensor (#39950) fcd1a26a9a is described below commit fcd1a26a9a006ee8ee3ac023abb247f565d36e67 Author: ellisms <114107920+elli...@users.noreply.github.com> AuthorDate: Fri May 31 10:49:07 2024 -0400 Allow user-specified object attributes to be used in check_fn for S3KeySensor (#39950) * Ability to specify s3 object attributes for check_fn * removed unncessary size check * Update airflow/providers/amazon/aws/sensors/s3.py Co-authored-by: Vincent <97131062+vincb...@users.noreply.github.com> - Co-authored-by: Vincent <97131062+vincb...@users.noreply.github.com> --- airflow/providers/amazon/aws/sensors/s3.py| 40 +++- tests/providers/amazon/aws/sensors/test_s3.py | 141 ++ 2 files changed, 176 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py index bb7105c3fb..adcdcbf010 100644 --- a/airflow/providers/amazon/aws/sensors/s3.py +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -78,6 +78,11 @@ class S3KeySensor(BaseSensorOperator): CA cert bundle than the one used by botocore. :param deferrable: Run operator in the deferrable mode :param use_regex: whether to use regex to check bucket +:param metadata_keys: List of head_object attributes to gather and send to ``check_fn``. +Acceptable values: Any top level attribute returned by s3.head_object. Specify * to return +all available attributes. +Default value: "Size". +If the requested attribute is not found, the key is still included and the value is None. """ template_fields: Sequence[str] = ("bucket_key", "bucket_name") @@ -93,6 +98,7 @@ class S3KeySensor(BaseSensorOperator): verify: str | bool | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), use_regex: bool = False, +metadata_keys: list[str] | None = None, **kwargs, ): super().__init__(**kwargs) @@ -104,14 +110,14 @@ class S3KeySensor(BaseSensorOperator): self.verify = verify self.deferrable = deferrable self.use_regex = use_regex +self.metadata_keys = metadata_keys if metadata_keys else ["Size"] def _check_key(self, key): bucket_name, key = S3Hook.get_s3_bucket_key(self.bucket_name, key, "bucket_name", "bucket_key") self.log.info("Poking for key : s3://%s/%s", bucket_name, key) """ -Set variable `files` which contains a list of dict which contains only the size -If needed we might want to add other attributes later +Set variable `files` which contains a list of dict which contains attributes defined by the user Format: [{ 'Size': int }] @@ -123,8 +129,21 @@ class S3KeySensor(BaseSensorOperator): if not key_matches: return False -# Reduce the set of metadata to size only -files = [{"Size": f["Size"]} for f in key_matches] +# Reduce the set of metadata to requested attributes +files = [] +for f in key_matches: +metadata = {} +if "*" in self.metadata_keys: +metadata = self.hook.head_object(f["Key"], bucket_name) +else: +for key in self.metadata_keys: +try: +metadata[key] = f[key] +except KeyError: +# supplied key might be from head_object response +self.log.info("Key %s not found in response, performing head_object", key) +metadata[key] = self.hook.head_object(f["Key"], bucket_name).get(key, None) +files.append(metadata) elif self.use_regex: keys = self.hook.get_file_metadata("", bucket_name) key_matches = [k for k in keys if re.match(pattern=key, string=k["Key"])] @@ -134,7 +153,18 @@ class S3KeySensor(BaseSensorOperator): obj = self.hook.head_object(key, bucket_name) if obj is None: return False -files = [{"Size": obj["ContentLength"]}] +metadata = {} +if "*" in se
(airflow) branch main updated (b7b6d1426f -> ddcc1b3a00)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from b7b6d1426f Resolve sshhook deprecations in tests (#39907) add ddcc1b3a00 Fix: remove process_func from templated_fields (#39948) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py | 1 - 1 file changed, 1 deletion(-)
(airflow) branch main updated (9f0b0258e2 -> 53081cd342)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 9f0b0258e2 Allow FTPHook to change port number (#39465) add 53081cd342 Implement amazon s3 to dynamodb transfer operator (#39654) No new revisions were added by this update. Summary of changes: airflow/providers/amazon/aws/hooks/dynamodb.py | 35 ++- .../amazon/aws/transfers/s3_to_dynamodb.py | 257 + airflow/providers/amazon/aws/waiters/dynamodb.json | 37 +++ airflow/providers/amazon/provider.yaml | 4 + .../{s3_to_redshift.rst => s3_to_dynamodb.rst} | 42 ++-- tests/providers/amazon/aws/hooks/test_dynamodb.py | 88 ++- .../amazon/aws/transfers/test_s3_to_dynamodb.py| 234 +++ tests/providers/amazon/aws/waiters/test_dynamo.py | 63 - .../providers/amazon/aws/example_s3_to_dynamodb.py | 192 +++ 9 files changed, 924 insertions(+), 28 deletions(-) create mode 100644 airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py copy docs/apache-airflow-providers-amazon/transfer/{s3_to_redshift.rst => s3_to_dynamodb.rst} (51%) create mode 100644 tests/providers/amazon/aws/transfers/test_s3_to_dynamodb.py create mode 100644 tests/system/providers/amazon/aws/example_s3_to_dynamodb.py
(airflow) branch main updated: Resolving EMR deprecated warnings (#39743)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new e565cea65c Resolving EMR deprecated warnings (#39743) e565cea65c is described below commit e565cea65cb42e43387aa7fd135ac46e8ac25f65 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Thu May 23 19:30:57 2024 +0530 Resolving EMR deprecated warnings (#39743) --- airflow/providers/amazon/aws/operators/emr.py | 24 +--- tests/always/test_example_dags.py | 1 - 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 1a4518af70..664d4b6d84 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -742,10 +742,20 @@ class EmrCreateJobFlowOperator(BaseOperator): waiter_max_attempts: int | None = None, waiter_delay: int | None = None, waiter_countdown: int | None = None, -waiter_check_interval_seconds: int = 60, +waiter_check_interval_seconds: int | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs: Any, ): +if waiter_check_interval_seconds: +warnings.warn( +"The parameter `waiter_check_interval_seconds` has been deprecated to " +"standardize naming conventions. Please `use waiter_delay instead`. In the " +"future this will default to None and defer to the waiter's default value.", +AirflowProviderDeprecationWarning, +stacklevel=2, +) +else: +waiter_check_interval_seconds = 60 if waiter_countdown: warnings.warn( "The parameter waiter_countdown has been deprecated to standardize " @@ -757,15 +767,7 @@ class EmrCreateJobFlowOperator(BaseOperator): # waiter_countdown defaults to never timing out, which is not supported # by boto waiters, so we will set it here to "a very long time" for now. waiter_max_attempts = (waiter_countdown or 999) // waiter_check_interval_seconds -if waiter_check_interval_seconds: -warnings.warn( -"The parameter waiter_check_interval_seconds has been deprecated to " -"standardize naming conventions. Please use waiter_delay instead. In the " -"future this will default to None and defer to the waiter's default value.", -AirflowProviderDeprecationWarning, -stacklevel=2, -) -waiter_delay = waiter_check_interval_seconds + super().__init__(**kwargs) self.aws_conn_id = aws_conn_id self.emr_conn_id = emr_conn_id @@ -773,7 +775,7 @@ class EmrCreateJobFlowOperator(BaseOperator): self.region_name = region_name self.wait_for_completion = wait_for_completion self.waiter_max_attempts = waiter_max_attempts or 60 -self.waiter_delay = waiter_delay or 30 +self.waiter_delay = waiter_delay or waiter_check_interval_seconds or 60 self.deferrable = deferrable @cached_property diff --git a/tests/always/test_example_dags.py b/tests/always/test_example_dags.py index 2e75183be0..43104a884c 100644 --- a/tests/always/test_example_dags.py +++ b/tests/always/test_example_dags.py @@ -48,7 +48,6 @@ IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING: tuple[str, ...] = ( # If the deprecation is postponed, the item should be added to this tuple, # and a corresponding Issue should be created on GitHub. "tests/system/providers/amazon/aws/example_ecs_fargate.py", -"tests/system/providers/amazon/aws/example_emr.py", "tests/system/providers/amazon/aws/example_emr_notebook_execution.py", "tests/system/providers/google/cloud/bigquery/example_bigquery_operations.py", "tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py",
(airflow) branch main updated: Add metrics about task CPU and memory usage (#39650)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 9139b22944 Add metrics about task CPU and memory usage (#39650) 9139b22944 is described below commit 9139b22944984e9e96e5277ad7d930e78c49e473 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Wed May 22 13:27:19 2024 -0400 Add metrics about task CPU and memory usage (#39650) --- airflow/task/task_runner/standard_task_runner.py | 25 +++ .../logging-monitoring/metrics.rst | 2 ++ .../task/task_runner/test_standard_task_runner.py | 37 -- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 00252acf42..5ecf1ad64c 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -21,6 +21,8 @@ from __future__ import annotations import logging import os +import threading +import time from typing import TYPE_CHECKING import psutil @@ -29,6 +31,7 @@ from setproctitle import setproctitle from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.models.taskinstance import TaskReturnCode from airflow.settings import CAN_FORK +from airflow.stats import Stats from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils.process_utils import reap_process_group, set_new_process_group @@ -53,6 +56,11 @@ class StandardTaskRunner(BaseTaskRunner): else: self.process = self._start_by_exec() +if self.process: +resource_monitor = threading.Thread(target=self._read_task_utilization) +resource_monitor.daemon = True +resource_monitor.start() + def _start_by_exec(self) -> psutil.Process: subprocess = self.run_command() self.process = psutil.Process(subprocess.pid) @@ -186,3 +194,20 @@ class StandardTaskRunner(BaseTaskRunner): if self.process is None: raise RuntimeError("Process is not started yet") return self.process.pid + +def _read_task_utilization(self): +dag_id = self._task_instance.dag_id +task_id = self._task_instance.task_id + +try: +while True: +with self.process.oneshot(): +mem_usage = self.process.memory_percent() +cpu_usage = self.process.cpu_percent() + +Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}", mem_usage) +Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}", cpu_usage) +time.sleep(5) +except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): +self.log.info("Process not found (most likely exited), stop collecting metrics") +return diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index efe565094a..f95c3a981c 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -242,6 +242,8 @@ Name Description ``pool.scheduled_tasks``Number of scheduled tasks in the pool. Metric with pool_name tagging. ``pool.starving_tasks.`` Number of starving tasks in the pool ``pool.starving_tasks`` Number of starving tasks in the pool. Metric with pool_name tagging. +``task.cpu_usage_percent..`` Percentage of CPU used by a task +``task.mem_usage_percent..`` Percentage of memory used by a task ``triggers.running.`` Number of triggers currently running for a triggerer (described by hostname) ``triggers.running``Number of triggers currently running for a triggerer (described by hostname). Metric with hostname tagging. diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index ab9b882e1f..381aefc7c1 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -28,6 +28,7 @@ from unittest.mock import patch import psutil import pytest +from airflow.exceptions import AirflowTaskTimeout from airflow.jobs.job import Job from airflow.jobs.local_task_job_runner import LocalTaskJobRunner from airflow.listen
(airflow) branch main updated: Fix automatic termination issue in `EmrOperator` by ensuring `waiter_max_attempts` is set for deferrable triggers (#38658)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 49b38719e2 Fix automatic termination issue in `EmrOperator` by ensuring `waiter_max_attempts` is set for deferrable triggers (#38658) 49b38719e2 is described below commit 49b38719e24bd4adb3354319aeacb87b87e79b9c Author: Hyunwoo (Jaden) Park AuthorDate: Tue May 21 14:45:45 2024 -0400 Fix automatic termination issue in `EmrOperator` by ensuring `waiter_max_attempts` is set for deferrable triggers (#38658) --- airflow/providers/amazon/aws/operators/emr.py| 8 airflow/providers/amazon/aws/sensors/emr.py | 8 .../amazon/aws/operators/test_emr_containers.py | 16 .../providers/amazon/aws/sensors/test_emr_containers.py | 14 ++ 4 files changed, 46 insertions(+) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index ec1be30f91..1a4518af70 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -617,6 +617,14 @@ class EmrContainerOperator(BaseOperator): job_id=self.job_id, aws_conn_id=self.aws_conn_id, waiter_delay=self.poll_interval, +waiter_max_attempts=self.max_polling_attempts, +) +if self.max_polling_attempts +else EmrContainerTrigger( +virtual_cluster_id=self.virtual_cluster_id, +job_id=self.job_id, +aws_conn_id=self.aws_conn_id, +waiter_delay=self.poll_interval, ), method_name="execute_complete", ) diff --git a/airflow/providers/amazon/aws/sensors/emr.py b/airflow/providers/amazon/aws/sensors/emr.py index 30a4c943c6..19e026e7a6 100644 --- a/airflow/providers/amazon/aws/sensors/emr.py +++ b/airflow/providers/amazon/aws/sensors/emr.py @@ -354,6 +354,14 @@ class EmrContainerSensor(BaseSensorOperator): job_id=self.job_id, aws_conn_id=self.aws_conn_id, waiter_delay=self.poll_interval, +waiter_max_attempts=self.max_retries, +) +if self.max_retries +else EmrContainerTrigger( +virtual_cluster_id=self.virtual_cluster_id, +job_id=self.job_id, +aws_conn_id=self.aws_conn_id, +waiter_delay=self.poll_interval, ), method_name="execute_complete", ) diff --git a/tests/providers/amazon/aws/operators/test_emr_containers.py b/tests/providers/amazon/aws/operators/test_emr_containers.py index 8e94e744d9..feeec1278e 100644 --- a/tests/providers/amazon/aws/operators/test_emr_containers.py +++ b/tests/providers/amazon/aws/operators/test_emr_containers.py @@ -144,6 +144,22 @@ class TestEmrContainerOperator: exc.value.trigger, EmrContainerTrigger ), f"{exc.value.trigger} is not a EmrContainerTrigger" +@mock.patch.object(EmrContainerHook, "submit_job") +@mock.patch.object( +EmrContainerHook, "check_query_status", return_value=EmrContainerHook.INTERMEDIATE_STATES[0] +) +def test_operator_defer_with_timeout(self, mock_submit_job, mock_check_query_status): +self.emr_container.deferrable = True +self.emr_container.max_polling_attempts = 1000 + +with pytest.raises(TaskDeferred) as e: +self.emr_container.execute(context=None) + +trigger = e.value.trigger +assert isinstance(trigger, EmrContainerTrigger), f"{trigger} is not a EmrContainerTrigger" +assert trigger.waiter_delay == self.emr_container.poll_interval +assert trigger.attempts == self.emr_container.max_polling_attempts + class TestEmrEksCreateClusterOperator: def setup_method(self): diff --git a/tests/providers/amazon/aws/sensors/test_emr_containers.py b/tests/providers/amazon/aws/sensors/test_emr_containers.py index 606281e70a..65ae072934 100644 --- a/tests/providers/amazon/aws/sensors/test_emr_containers.py +++ b/tests/providers/amazon/aws/sensors/test_emr_containers.py @@ -84,3 +84,17 @@ class TestEmrContainerSensor: assert isinstance( e.value.trigger, EmrContainerTrigger ), f"{e.value.trigger} is not a EmrContainerTrigger" + + @mock.patch("airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor.poke") +def test_sensor_defer_with_timeout(self, mock_poke): +self.sensor.deferrable = True +mock_poke.return_value = False +self.sensor.max_retries = 1000 +
(airflow) branch main updated (b7671ef5ab -> b41f429612)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from b7671ef5ab Re-configure ORM in spawned OpenLineage process in scheduler. (#39735) add b41f429612 Remove deprecations eks (#39709) No new revisions were added by this update. Summary of changes: tests/always/test_example_dags.py| 1 - tests/system/providers/amazon/aws/example_eks_with_nodegroups.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-)
(airflow) branch main updated: Introduce Amazon Comprehend Service (#39592)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 9dd77520be Introduce Amazon Comprehend Service (#39592) 9dd77520be is described below commit 9dd77520be3d8492156958d57b63b5779a3f55eb Author: gopidesupavan AuthorDate: Wed May 15 15:05:53 2024 +0100 Introduce Amazon Comprehend Service (#39592) --- airflow/providers/amazon/aws/hooks/comprehend.py | 37 .../providers/amazon/aws/operators/comprehend.py | 192 + airflow/providers/amazon/aws/sensors/comprehend.py | 147 .../providers/amazon/aws/triggers/comprehend.py| 61 +++ .../providers/amazon/aws/waiters/comprehend.json | 49 ++ airflow/providers/amazon/provider.yaml | 18 ++ .../operators/comprehend.rst | 74 .../aws/amazon-comprehend_light...@4x.png | Bin 0 -> 7254 bytes docs/spelling_wordlist.txt | 2 + tests/always/test_project_structure.py | 2 + .../providers/amazon/aws/hooks/test_comprehend.py | 31 .../amazon/aws/operators/test_comprehend.py| 163 + .../amazon/aws/sensors/test_comprehend.py | 94 ++ .../amazon/aws/triggers/test_comprehend.py | 67 +++ .../amazon/aws/waiters/test_comprehend.py | 71 .../providers/amazon/aws/example_comprehend.py | 137 +++ 16 files changed, 1145 insertions(+) diff --git a/airflow/providers/amazon/aws/hooks/comprehend.py b/airflow/providers/amazon/aws/hooks/comprehend.py new file mode 100644 index 00..897aaf72ee --- /dev/null +++ b/airflow/providers/amazon/aws/hooks/comprehend.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook + + +class ComprehendHook(AwsBaseHook): +""" +Interact with AWS Comprehend. + +Provide thin wrapper around :external+boto3:py:class:`boto3.client("comprehend") `. + +Additional arguments (such as ``aws_conn_id``) may be specified and +are passed down to the underlying AwsBaseHook. + +.. seealso:: +- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` +""" + +def __init__(self, *args, **kwargs) -> None: +kwargs["client_type"] = "comprehend" +super().__init__(*args, **kwargs) diff --git a/airflow/providers/amazon/aws/operators/comprehend.py b/airflow/providers/amazon/aws/operators/comprehend.py new file mode 100644 index 00..780e227af4 --- /dev/null +++ b/airflow/providers/amazon/aws/operators/comprehend.py @@ -0,0 +1,192 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.comprehend import ComprehendPiiEntitiesDetectio
(airflow) branch main updated (1489cf7a03 -> 339ea508e2)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 1489cf7a03 Fix deferrable mode for BeamRunJavaPipelineOperator (#39371) add 339ea508e2 Handle task adoption for batch executor (#39590) No new revisions were added by this update. Summary of changes: .../amazon/aws/executors/batch/batch_executor.py | 50 -- .../aws/executors/batch/test_batch_executor.py | 29 + 2 files changed, 76 insertions(+), 3 deletions(-)
(airflow) branch main updated (32e2006a07 -> 678d104751)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 32e2006a07 Remove from `deprecations_ignore.yml` already resolved warnings exclusions (#39369) add 678d104751 Remove FAB test side effects (#39360) No new revisions were added by this update. Summary of changes: .../fab/auth_manager/api_endpoints/test_user_endpoint.py | 3 ++- .../fab/auth_manager/cli_commands/test_role_command.py | 14 +++--- .../fab/auth_manager/cli_commands/test_user_command.py | 8 tests/providers/fab/auth_manager/views/test_permissions.py | 6 -- tests/providers/fab/auth_manager/views/test_roles_list.py | 6 -- tests/providers/fab/auth_manager/views/test_user.py| 6 -- tests/providers/fab/auth_manager/views/test_user_edit.py | 6 -- tests/providers/fab/auth_manager/views/test_user_stats.py | 6 -- 8 files changed, 33 insertions(+), 22 deletions(-)
(airflow) branch main updated: Simplify action name retrieval in FAB auth manager (#39358)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 959e52bf3c Simplify action name retrieval in FAB auth manager (#39358) 959e52bf3c is described below commit 959e52bf3c48ba1f2622187179fca28f908a859a Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Thu May 2 10:54:25 2024 -0400 Simplify action name retrieval in FAB auth manager (#39358) --- airflow/providers/fab/auth_manager/fab_auth_manager.py | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow/providers/fab/auth_manager/fab_auth_manager.py b/airflow/providers/fab/auth_manager/fab_auth_manager.py index 547bc626bb..ffd5e5cab5 100644 --- a/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -272,10 +272,7 @@ class FabAuthManager(BaseAuthManager): ): if not user: user = self.get_user() -if method in get_fab_action_from_method_map(): -fab_action_name = get_fab_action_from_method_map()[method] -else: -fab_action_name = method +fab_action_name = get_fab_action_from_method_map().get(method, method) return (fab_action_name, resource_name) in self._get_user_permissions(user) @provide_session
(airflow) branch main updated: Add tests for `EmrServerlessJobSensor` and `EmrServerlessApplicationSensor` (#39099)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 6d09adf992 Add tests for `EmrServerlessJobSensor` and `EmrServerlessApplicationSensor` (#39099) 6d09adf992 is described below commit 6d09adf99262144c71bd15c09bc18f956667ccae Author: Mateus Latrova Stephanin <57113699+mateuslatr...@users.noreply.github.com> AuthorDate: Fri Apr 26 10:56:27 2024 -0300 Add tests for `EmrServerlessJobSensor` and `EmrServerlessApplicationSensor` (#39099) --- airflow/providers/amazon/aws/sensors/emr.py| 10 ++- .../aws/sensors/test_emr_serverless_application.py | 90 + .../amazon/aws/sensors/test_emr_serverless_job.py | 91 ++ 3 files changed, 189 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/emr.py b/airflow/providers/amazon/aws/sensors/emr.py index c5c22f97bc..30a4c943c6 100644 --- a/airflow/providers/amazon/aws/sensors/emr.py +++ b/airflow/providers/amazon/aws/sensors/emr.py @@ -24,7 +24,11 @@ from typing import TYPE_CHECKING, Any, Iterable, Sequence from deprecated import deprecated from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException +from airflow.exceptions import ( +AirflowException, +AirflowProviderDeprecationWarning, +AirflowSkipException, +) from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook from airflow.providers.amazon.aws.links.emr import EmrClusterLink, EmrLogsLink, get_log_uri from airflow.providers.amazon.aws.triggers.emr import ( @@ -231,7 +235,9 @@ class EmrServerlessApplicationSensor(BaseSensorOperator): if state in EmrServerlessHook.APPLICATION_FAILURE_STATES: # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 -failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}" +failure_message = ( +f"EMR Serverless application failed: {self.failure_message_from_response(response)}" +) if self.soft_fail: raise AirflowSkipException(failure_message) raise AirflowException(failure_message) diff --git a/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py b/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py new file mode 100644 index 00..c35d84e7fa --- /dev/null +++ b/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor + + +class TestEmrServerlessApplicationSensor: +def setup_method(self): +self.app_id = "vzwemreks" +self.job_run_id = "job1234" +self.sensor = EmrServerlessApplicationSensor( +task_id="test_emrcontainer_sensor", +application_id=self.app_id, +aws_conn_id="aws_default", +) + +def set_get_application_return_value(self, return_value: dict[str, str]): +self.mock_hook = MagicMock() +self.mock_hook.conn.get_application.return_value = return_value +self.sensor.hook = self.mock_hook + +def assert_get_application_was_called_once_with_app_id(self): + self.mock_hook.conn.get_application.assert_called_once_with(applicationId=self.app_id) + + +class TestPokeReturnValue(TestEmrServerlessApplicationSensor): +@pytest.mark.parametrize( +"state, expected_result", +[ +("CREATING", False), +("STARTING", False), +("STOPPING", False), +("CREATED", True)
(airflow) branch main updated (ead9b00f7c -> 7635ff3555)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from ead9b00f7c Bump minimum Airflow version in providers to Airflow 2.7.0 (#39240) add 7635ff3555 Remove plugins permissions from Viewer role (#39254) No new revisions were added by this update. Summary of changes: airflow/providers/fab/auth_manager/security_manager/override.py | 4 ++-- tests/providers/fab/auth_manager/test_security.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-)
(airflow) branch main updated: Update `is_authorized_custom_view` from auth manager to handle custom actions (#39167)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new f7a2f60325 Update `is_authorized_custom_view` from auth manager to handle custom actions (#39167) f7a2f60325 is described below commit f7a2f6032544defa8a00d1f7fa90e91d27eb3a8e Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Apr 22 12:36:26 2024 -0400 Update `is_authorized_custom_view` from auth manager to handle custom actions (#39167) --- airflow/auth/managers/base_auth_manager.py | 7 +-- airflow/providers/amazon/aws/auth_manager/avp/entities.py | 2 +- airflow/providers/amazon/aws/auth_manager/avp/facade.py | 7 +-- .../providers/amazon/aws/auth_manager/aws_auth_manager.py | 2 +- airflow/providers/fab/auth_manager/fab_auth_manager.py | 7 +-- tests/auth/managers/test_base_auth_manager.py | 2 +- tests/providers/fab/auth_manager/test_fab_auth_manager.py | 13 - 7 files changed, 30 insertions(+), 10 deletions(-) diff --git a/airflow/auth/managers/base_auth_manager.py b/airflow/auth/managers/base_auth_manager.py index 4d5c249235..7bb4e92889 100644 --- a/airflow/auth/managers/base_auth_manager.py +++ b/airflow/auth/managers/base_auth_manager.py @@ -237,7 +237,7 @@ class BaseAuthManager(LoggingMixin): @abstractmethod def is_authorized_custom_view( -self, *, method: ResourceMethod, resource_name: str, user: BaseUser | None = None +self, *, method: ResourceMethod | str, resource_name: str, user: BaseUser | None = None ): """ Return whether the user is authorized to perform a given action on a custom view. @@ -246,7 +246,10 @@ class BaseAuthManager(LoggingMixin): the auth manager is used as part of the environment. It can also be a view defined as part of a plugin defined by a user. -:param method: the method to perform +:param method: the method to perform. +The method can also be a string if the action has been defined in a plugin. +In that case, the action can be anything (e.g. can_do). +See https://github.com/apache/airflow/issues/39144 :param resource_name: the name of the resource :param user: the user to perform the action on. If not provided (or None), it uses the current user """ diff --git a/airflow/providers/amazon/aws/auth_manager/avp/entities.py b/airflow/providers/amazon/aws/auth_manager/avp/entities.py index f2c6376729..8c2e8855b8 100644 --- a/airflow/providers/amazon/aws/auth_manager/avp/entities.py +++ b/airflow/providers/amazon/aws/auth_manager/avp/entities.py @@ -55,7 +55,7 @@ def get_entity_type(resource_type: AvpEntities) -> str: return AVP_PREFIX_ENTITIES + resource_type.value -def get_action_id(resource_type: AvpEntities, method: ResourceMethod): +def get_action_id(resource_type: AvpEntities, method: ResourceMethod | str): """ Return action id. diff --git a/airflow/providers/amazon/aws/auth_manager/avp/facade.py b/airflow/providers/amazon/aws/auth_manager/avp/facade.py index 010531155e..4bb9515004 100644 --- a/airflow/providers/amazon/aws/auth_manager/avp/facade.py +++ b/airflow/providers/amazon/aws/auth_manager/avp/facade.py @@ -75,7 +75,7 @@ class AwsAuthManagerAmazonVerifiedPermissionsFacade(LoggingMixin): def is_authorized( self, *, -method: ResourceMethod, +method: ResourceMethod | str, entity_type: AvpEntities, user: AwsAuthManagerUser | None, entity_id: str | None = None, @@ -86,7 +86,10 @@ class AwsAuthManagerAmazonVerifiedPermissionsFacade(LoggingMixin): Check whether the user has permissions to access given resource. -:param method: the method to perform +:param method: the method to perform. +The method can also be a string if the action has been defined in a plugin. +In that case, the action can be anything (e.g. can_do). +See https://github.com/apache/airflow/issues/39144 :param entity_type: the entity type the user accesses :param user: the user :param entity_id: the entity ID the user accesses. If not provided, all entities of the type will be diff --git a/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index 57b9f9ea0c..f94e4de691 100644 --- a/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -197,7 +197,7 @@ class AwsAuthManager(BaseAuthManager): ) def is_authorized_custom_view( -self, *, method: ResourceMethod, resource_name: str,
(airflow) branch main updated: Add examples in AWS auth manager documentation (#39040)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 4a288460a5 Add examples in AWS auth manager documentation (#39040) 4a288460a5 is described below commit 4a288460a501364cd228c4b2f7a24401c4c4e992 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Tue Apr 16 12:05:02 2024 -0300 Add examples in AWS auth manager documentation (#39040) --- .../auth-manager/manage/index.rst | 70 +- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-amazon/auth-manager/manage/index.rst b/docs/apache-airflow-providers-amazon/auth-manager/manage/index.rst index 3d75633f50..0a540b8d32 100644 --- a/docs/apache-airflow-providers-amazon/auth-manager/manage/index.rst +++ b/docs/apache-airflow-providers-amazon/auth-manager/manage/index.rst @@ -134,6 +134,8 @@ Give all permissions to specific user Give all permissions to a group of users +This is equivalent to the :doc:`Admin role in Flask AppBuilder `. + :: permit( @@ -148,6 +150,8 @@ Give all permissions to a group of users Give read-only permissions to a group of users ~~ +This is equivalent to the :doc:`Viewer role in Flask AppBuilder `. + :: permit( @@ -156,7 +160,6 @@ Give read-only permissions to a group of users Airflow::Action::"Configuration.GET", Airflow::Action::"Connection.GET", Airflow::Action::"Custom.GET", - Airflow::Action::"Dag.PUT", Airflow::Action::"Dag.GET", Airflow::Action::"Menu.MENU", Airflow::Action::"Pool.GET", @@ -167,6 +170,71 @@ Give read-only permissions to a group of users resource ); +Give standard Airflow user permissions to a group of users +~~ + +This is equivalent to the :doc:`User role in Flask AppBuilder `. + + :: + + permit( +principal in Airflow::Group::"----", +action in [ + Airflow::Action::"Configuration.GET", + Airflow::Action::"Connection.GET", + Airflow::Action::"Custom.GET", + Airflow::Action::"Dag.GET", + Airflow::Action::"Menu.MENU", + Airflow::Action::"Pool.GET", + Airflow::Action::"Variable.GET", + Airflow::Action::"Dataset.GET", + Airflow::Action::"View.GET", + Airflow::Action::"Dag.POST", + Airflow::Action::"Dag.PUT", + Airflow::Action::"Dag.DELETE", +], +resource + ); + +Give operational permissions to a group of users + + +This is equivalent to the :doc:`Op role in Flask AppBuilder `. + + :: + + permit( +principal in Airflow::Group::"----", +action in [ + Airflow::Action::"Configuration.GET", + Airflow::Action::"Connection.GET", + Airflow::Action::"Custom.GET", + Airflow::Action::"Dag.GET", + Airflow::Action::"Menu.MENU", + Airflow::Action::"Pool.GET", + Airflow::Action::"Variable.GET", + Airflow::Action::"Dataset.GET", + Airflow::Action::"View.GET", + Airflow::Action::"Dag.POST", + Airflow::Action::"Dag.PUT", + Airflow::Action::"Dag.DELETE", + Airflow::Action::"Connection.POST", + Airflow::Action::"Connection.PUT", + Airflow::Action::"Connection.DELETE", + Airflow::Action::"Pool.POST", + Airflow::Action::"Pool.PUT", + Airflow::Action::"Pool.DELETE", + Airflow::Action::"Variable.POST", + Airflow::Action::"Variable.PUT", + Airflow::Action::"Variable.DELETE", + Airflow::Action::"Dataset.POST", + Airflow::Action::"Dataset.PUT", + Airflow::Action::"Dataset.DELETE", + +], +resource + ); + Give DAG specific permissions to a group of users ~
(airflow) branch main updated (246f697873 -> 1ded297509)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 246f697873 Fix PROD image package installation in CI (#39035) add 1ded297509 Remove flag from AWS auth manager to use it (#39033) No new revisions were added by this update. Summary of changes: .../providers/amazon/aws/auth_manager/aws_auth_manager.py | 10 -- airflow/providers/amazon/aws/auth_manager/constants.py | 1 - .../auth-manager/manage/index.rst | 13 ++--- .../amazon/aws/auth_manager/test_aws_auth_manager.py| 3 --- tests/providers/amazon/aws/auth_manager/test_constants.py | 4 tests/providers/amazon/aws/auth_manager/views/test_auth.py | 3 --- .../providers/amazon/aws/tests/test_aws_auth_manager.py | 1 - 7 files changed, 6 insertions(+), 29 deletions(-)
(airflow) branch main updated: Check whether `AUTH_ROLE_PUBLIC` is set in `check_authentication` (#39012)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 21f08ddd93 Check whether `AUTH_ROLE_PUBLIC` is set in `check_authentication` (#39012) 21f08ddd93 is described below commit 21f08ddd93f7ff8ee18f1fec1778f3d016e3db00 Author: Wei Lee AuthorDate: Mon Apr 15 22:37:33 2024 +0800 Check whether `AUTH_ROLE_PUBLIC` is set in `check_authentication` (#39012) --- airflow/api_connexion/security.py | 6 + tests/api_connexion/conftest.py| 15 +- .../endpoints/test_config_endpoint.py | 22 +++ .../endpoints/test_connection_endpoint.py | 89 ++ tests/api_connexion/endpoints/test_dag_endpoint.py | 99 +++ .../endpoints/test_dag_run_endpoint.py | 185 .../endpoints/test_dag_source_endpoint.py | 16 ++ .../endpoints/test_dag_warning_endpoint.py | 12 ++ .../endpoints/test_dataset_endpoint.py | 186 - .../endpoints/test_event_log_endpoint.py | 44 + .../test_role_and_permission_endpoint.py | 63 +++ tests/providers/fab/auth_manager/conftest.py | 15 +- 12 files changed, 749 insertions(+), 3 deletions(-) diff --git a/airflow/api_connexion/security.py b/airflow/api_connexion/security.py index 1cc044d9dd..660bc6cce2 100644 --- a/airflow/api_connexion/security.py +++ b/airflow/api_connexion/security.py @@ -49,6 +49,12 @@ def check_authentication() -> None: response = auth.requires_authentication(Response)() if response.status_code == 200: return + +# Even if the current_user is anonymous, the AUTH_ROLE_PUBLIC might still have permission. +appbuilder = get_airflow_app().appbuilder +if appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC", None): +return + # since this handler only checks authentication, not authorization, # we should always return 401 raise Unauthenticated(headers=response.headers) diff --git a/tests/api_connexion/conftest.py b/tests/api_connexion/conftest.py index c860a78f27..481f07fe73 100644 --- a/tests/api_connexion/conftest.py +++ b/tests/api_connexion/conftest.py @@ -40,7 +40,9 @@ def minimal_app_for_api(): ) def factory(): with conf_vars({("api", "auth_backends"): "tests.test_utils.remote_user_api_auth_backend"}): -return app.create_app(testing=True, config={"WTF_CSRF_ENABLED": False}) # type:ignore +_app = app.create_app(testing=True, config={"WTF_CSRF_ENABLED": False}) # type:ignore +_app.config["AUTH_ROLE_PUBLIC"] = None +return _app return factory() @@ -67,3 +69,14 @@ def dagbag(): ) DagBag(include_examples=True, read_dags_from_db=False).sync_to_db() return DagBag(include_examples=True, read_dags_from_db=True) + + +@pytest.fixture +def set_auto_role_public(request): +app = request.getfixturevalue("minimal_app_for_api") +auto_role_public = app.config["AUTH_ROLE_PUBLIC"] +app.config["AUTH_ROLE_PUBLIC"] = request.param + +yield + +app.config["AUTH_ROLE_PUBLIC"] = auto_role_public diff --git a/tests/api_connexion/endpoints/test_config_endpoint.py b/tests/api_connexion/endpoints/test_config_endpoint.py index c091c4ef1c..3dd5814e5d 100644 --- a/tests/api_connexion/endpoints/test_config_endpoint.py +++ b/tests/api_connexion/endpoints/test_config_endpoint.py @@ -222,6 +222,16 @@ class TestGetConfig: assert response.status_code == 403 assert "chose not to expose" in response.json["detail"] +@pytest.mark.parametrize( +"set_auto_role_public, expected_status_code", +(("Public", 403), ("Admin", 200)), +indirect=["set_auto_role_public"], +) +def test_with_auth_role_public_set(self, set_auto_role_public, expected_status_code): +response = self.client.get("/api/v1/config", headers={"Accept": "application/json"}) + +assert response.status_code == expected_status_code + class TestGetValue: @pytest.fixture(autouse=True) @@ -339,3 +349,15 @@ class TestGetValue: ) assert response.status_code == 403 assert "chose not to expose" in response.json["detail"] + +@pytest.mark.parametrize( +"set_auto_role_public, expected_status_code", +(("Public", 403), ("Admin", 200)), +indirect=["set_auto_role_public"], +) +def test_with_auth_role_public_set(self, set_auto_role_public, expected_status_code): +response = self.client.get( +
(airflow) branch main updated (259bc1c7b8 -> ed99893853)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 259bc1c7b8 Fix Mypy error introduced in main (#38975) add ed99893853 Fix `DbApiHook.insert_rows` when `rows` is a generator (#38972) No new revisions were added by this update. Summary of changes: airflow/providers/common/sql/hooks/sql.py | 5 - 1 file changed, 4 insertions(+), 1 deletion(-)
(airflow) branch main updated: Remove button for reset my password when we have reset password (#38957)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new c3bb80da93 Remove button for reset my password when we have reset password (#38957) c3bb80da93 is described below commit c3bb80da939025dd49b646a819f5e984faf9ddfc Author: Amogh Desai AuthorDate: Fri Apr 12 18:44:58 2024 +0530 Remove button for reset my password when we have reset password (#38957) --- airflow/providers/fab/auth_manager/views/user.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/fab/auth_manager/views/user.py b/airflow/providers/fab/auth_manager/views/user.py index ecb51a4e4e..825248e47a 100644 --- a/airflow/providers/fab/auth_manager/views/user.py +++ b/airflow/providers/fab/auth_manager/views/user.py @@ -79,6 +79,7 @@ class MultiResourceUserMixin: pk = self._deserialize_pk_if_composite(pk) widgets = self._show(pk) widgets["show"].template_args["actions"].pop("userinfoedit", None) +widgets["show"].template_args["actions"].pop("resetmypassword", None) return self.render_template( self.show_template, pk=pk,
(airflow) branch main updated (725c568e73 -> 4a3caa2e35)
This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 725c568e73 fix(airbyte/hooks): add schema and port to prevent InvalidURL error (#38860) add 4a3caa2e35 Amazon Bedrock - System Test Fixes (#38945) No new revisions were added by this update. Summary of changes: tests/system/providers/amazon/aws/example_bedrock.py | 17 +++-- 1 file changed, 11 insertions(+), 6 deletions(-)