(airflow) branch main updated: fix(dag): avoid getting dataset next run info for unresolved dataset alias (#41828)

2024-09-20 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new a083930490 fix(dag): avoid getting dataset next run info for 
unresolved dataset alias (#41828)
a083930490 is described below

commit a083930490c33949377594cfbe02ed928d90d899
Author: Wei Lee 
AuthorDate: Fri Sep 20 01:17:45 2024 -0700

fix(dag): avoid getting dataset next run info for unresolved dataset alias 
(#41828)
---
 airflow/models/dag.py|  5 -
 airflow/timetables/simple.py |  4 +++-
 tests/models/test_dag.py | 16 
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6447f7be15..388f322d2d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3274,7 +3274,10 @@ class DagModel(Base):
 def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> 
dict[str, int | str] | None:
 if self.dataset_expression is None:
 return None
-return get_dataset_triggered_next_run_info([self.dag_id], 
session=session)[self.dag_id]
+
+# When a dataset alias does not resolve into datasets, 
get_dataset_triggered_next_run_info returns
+# an empty dict as there's no dataset info to get. This method should 
thus return None.
+return get_dataset_triggered_next_run_info([self.dag_id], 
session=session).get(self.dag_id, None)
 
 
 # NOTE: Please keep the list of arguments in sync with DAG.__init__.
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 1c54f4ddee..ad166a6413 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -161,6 +161,8 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
 :meta private:
 """
 
+UNRESOLVED_ALIAS_SUMMARY = "Unresolved DatasetAlias"
+
 description: str = "Triggered by datasets"
 
 def __init__(self, datasets: BaseDataset) -> None:
@@ -170,7 +172,7 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
 self.dataset_condition = 
_DatasetAliasCondition(self.dataset_condition.name)
 
 if not next(self.dataset_condition.iter_datasets(), False):
-self._summary = "Unresolved DatasetAlias"
+self._summary = DatasetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
 else:
 self._summary = "Dataset"
 
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 093fdcae2f..d5fd2ad729 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3433,6 +3433,22 @@ def test_get_dataset_triggered_next_run_info(dag_maker, 
clear_datasets):
 }
 
 
+@pytest.mark.need_serialized_dag
+def 
test_get_dataset_triggered_next_run_info_with_unresolved_dataset_alias(dag_maker,
 clear_datasets):
+dataset_alias1 = DatasetAlias(name="alias")
+with dag_maker(dag_id="dag-1", schedule=[dataset_alias1]):
+pass
+dag1 = dag_maker.dag
+session = dag_maker.session
+session.flush()
+
+info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
+assert info == {}
+
+dag1_model = DagModel.get_dagmodel(dag1.dag_id)
+assert dag1_model.get_dataset_triggered_next_run_info(session=session) is 
None
+
+
 def test_dag_uses_timetable_for_run_id(session):
 class CustomRunIdTimetable(Timetable):
 def generate_run_id(self, *, run_type, logical_date, data_interval, 
**extra) -> str:



(airflow) branch main updated: Adding a test for the init method of the Databricks Base Hook (#42180)

2024-09-19 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d1efb7425b Adding a test for the init method of the Databricks Base 
Hook (#42180)
d1efb7425b is described below

commit d1efb7425b58617a5c43a9bde988f0f0bf816db6
Author: Bonnie Why 
AuthorDate: Thu Sep 19 05:32:14 2024 -0500

Adding a test for the init method of the Databricks Base Hook (#42180)
---
 tests/always/test_project_structure.py |  1 -
 .../databricks/hooks/test_databricks_base.py   | 33 ++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index a6d174e639..6cf3329e8e 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -101,7 +101,6 @@ class TestProjectStructure:
 "tests/providers/cncf/kubernetes/utils/test_delete_from.py",
 
"tests/providers/cncf/kubernetes/utils/test_k8s_hashlib_wrapper.py",
 "tests/providers/cncf/kubernetes/utils/test_xcom_sidecar.py",
-"tests/providers/databricks/hooks/test_databricks_base.py",
 "tests/providers/google/cloud/fs/test_gcs.py",
 "tests/providers/google/cloud/links/test_automl.py",
 "tests/providers/google/cloud/links/test_base.py",
diff --git a/tests/providers/databricks/hooks/test_databricks_base.py 
b/tests/providers/databricks/hooks/test_databricks_base.py
new file mode 100644
index 00..fdc019c1b5
--- /dev/null
+++ b/tests/providers/databricks/hooks/test_databricks_base.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.databricks.hooks.databricks_base import 
BaseDatabricksHook
+
+DEFAULT_CONN_ID = "databricks_default"
+
+
+class TestBaseDatabricksHook:
+def test_init_exception(self):
+"""
+Tests handling incorrect parameters passed to ``__init__``
+"""
+with pytest.raises(ValueError, match="Retry limit must be greater than 
or equal to 1"):
+BaseDatabricksHook(databricks_conn_id=DEFAULT_CONN_ID, 
retry_limit=0)



(airflow) branch main updated: allow dataset alias to add more than one dataset events (#42189)

2024-09-16 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new a5d0a63d87 allow dataset alias to add more than one dataset events 
(#42189)
a5d0a63d87 is described below

commit a5d0a63d8784d7f4100a4770748c783261968e3c
Author: Wei Lee 
AuthorDate: Mon Sep 16 02:44:22 2024 -0700

allow dataset alias to add more than one dataset events (#42189)
---
 airflow/datasets/__init__.py   |  1 +
 airflow/models/taskinstance.py |  6 +++---
 airflow/serialization/serialized_objects.py| 17 +++
 airflow/utils/context.py   | 10 -
 airflow/utils/context.pyi  |  4 ++--
 tests/serialization/test_serialized_objects.py | 14 ++---
 tests/utils/test_context.py| 29 ++
 7 files changed, 45 insertions(+), 36 deletions(-)

diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index bfddc0f0b0..cd57078095 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -231,6 +231,7 @@ class DatasetAliasEvent(TypedDict):
 
 source_alias_name: str
 dest_dataset_uri: str
+extra: dict[str, Any]
 
 
 def _set_extra_default(extra: dict | None) -> dict:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5f82d84fe5..7169d75c7c 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2905,11 +2905,11 @@ class TaskInstance(Base, LoggingMixin):
 session=session,
 )
 elif isinstance(obj, DatasetAlias):
-if dataset_alias_event := events[obj].dataset_alias_event:
+for dataset_alias_event in events[obj].dataset_alias_events:
+dataset_alias_name = 
dataset_alias_event["source_alias_name"]
 dataset_uri = dataset_alias_event["dest_dataset_uri"]
-extra = events[obj].extra
+extra = dataset_alias_event["extra"]
 frozen_extra = frozenset(extra.items())
-dataset_alias_name = 
dataset_alias_event["source_alias_name"]
 
 dataset_tuple_to_alias_names_mapping[(dataset_uri, 
frozen_extra)].add(dataset_alias_name)
 
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 71e655880d..998b5ba3f4 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -285,15 +285,24 @@ def encode_outlet_event_accessor(var: 
OutletEventAccessor) -> dict[str, Any]:
 raw_key = var.raw_key
 return {
 "extra": var.extra,
-"dataset_alias_event": var.dataset_alias_event,
+"dataset_alias_events": var.dataset_alias_events,
 "raw_key": BaseSerialization.serialize(raw_key),
 }
 
 
 def decode_outlet_event_accessor(var: dict[str, Any]) -> OutletEventAccessor:
-raw_key = BaseSerialization.deserialize(var["raw_key"])
-outlet_event_accessor = OutletEventAccessor(extra=var["extra"], 
raw_key=raw_key)
-outlet_event_accessor.dataset_alias_event = var["dataset_alias_event"]
+# This is added for compatibility. The attribute used to be 
dataset_alias_event and
+# is now dataset_alias_events.
+if dataset_alias_event := var.get("dataset_alias_event", None):
+dataset_alias_events = [dataset_alias_event]
+else:
+dataset_alias_events = var.get("dataset_alias_events", [])
+
+outlet_event_accessor = OutletEventAccessor(
+extra=var["extra"],
+raw_key=BaseSerialization.deserialize(var["raw_key"]),
+dataset_alias_events=dataset_alias_events,
+)
 return outlet_event_accessor
 
 
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index c2a0ad7052..a72885401f 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -172,7 +172,7 @@ class OutletEventAccessor:
 
 raw_key: str | Dataset | DatasetAlias
 extra: dict[str, Any] = attrs.Factory(dict)
-dataset_alias_event: DatasetAliasEvent | None = None
+dataset_alias_events: list[DatasetAliasEvent] = attrs.field(factory=list)
 
 def add(self, dataset: Dataset | str, extra: dict[str, Any] | None = None) 
-> None:
 """Add a DatasetEvent to an existing Dataset."""
@@ -190,12 +190,10 @@ class OutletEventAccessor:
 else:
 return
 
-if extra:
-self.extra = extra
-
-self.dataset_alias_event = DatasetAliasEvent(
-source_alias_name=dataset_alias_name, dest_dataset_uri=dataset_uri
+event

(airflow) branch main updated: ci: auto fix default_deferrable value with LibCST (#41984)

2024-09-06 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 07dd9f4deb ci: auto fix default_deferrable value with LibCST (#41984)
07dd9f4deb is described below

commit 07dd9f4deb970920a291b260c81c136e264d0605
Author: Wei Lee 
AuthorDate: Fri Sep 6 22:58:48 2024 +0800

ci: auto fix default_deferrable value with LibCST (#41984)
---
 .pre-commit-config.yaml   |  13 +--
 contributing-docs/08_static_code_checks.rst   |   2 +-
 dev/breeze/doc/images/output_static-checks.svg|  38 
 dev/breeze/doc/images/output_static-checks.txt|   2 +-
 dev/breeze/src/airflow_breeze/pre_commit_ids.py   |   2 +-
 scripts/ci/pre_commit/check_deferrable_default.py | 111 --
 6 files changed, 91 insertions(+), 77 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 44aa21db52..9e91b09613 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -218,6 +218,13 @@ repos:
 files: 
^airflow/models/taskinstance.py$|^airflow/models/taskinstancehistory.py$
 pass_filenames: false
 require_serial: true
+  - id: check-deferrable-default
+name: Check and fix default value of default_deferrable
+language: python
+entry: ./scripts/ci/pre_commit/check_deferrable_default.py
+pass_filenames: false
+additional_dependencies: ["libcst>=1.4.0"]
+files: ^airflow/.*/sensors/.*\.py$|^airflow/.*/operators/.*\.py$
   - repo: https://github.com/asottile/blacken-docs
 rev: 1.18.0
 hooks:
@@ -1165,12 +1172,6 @@ repos:
 pass_filenames: true
 files: \.py$
 exclude: 
^airflow/providers|^dev/.*\.py$|^scripts/.*\.py$|^tests/|^\w+_tests/|^docs/.*\.py$|^airflow/utils/helpers.py$|^hatch_build.py$
-  - id: check-deferrable-default-value
-name: Check default value of deferrable attribute
-language: python
-entry: ./scripts/ci/pre_commit/check_deferrable_default.py
-pass_filenames: false
-files: ^airflow/.*/sensors/.*\.py$|^airflow/.*/operators/.*\.py$
   - id: check-provider-docs-valid
 name: Validate provider doc files
 entry: ./scripts/ci/pre_commit/check_provider_docs.py
diff --git a/contributing-docs/08_static_code_checks.rst 
b/contributing-docs/08_static_code_checks.rst
index 80be425da0..17506303ff 100644
--- a/contributing-docs/08_static_code_checks.rst
+++ b/contributing-docs/08_static_code_checks.rst
@@ -160,7 +160,7 @@ require Breeze Docker image to be built locally.
 
+---+--+-+
 | check-decorated-operator-implements-custom-name   | Check @task 
decorator implements custom_operator_name| |
 
+---+--+-+
-| check-deferrable-default-value| Check default 
value of deferrable attribute  | |
+| check-deferrable-default  | Check and fix 
default value of default_deferrable| |
 
+---+--+-+
 | check-docstring-param-types   | Check that 
docstrings do not specify param types | |
 
+---+--+-+
diff --git a/dev/breeze/doc/images/output_static-checks.svg 
b/dev/breeze/doc/images/output_static-checks.svg
index 45889ee41b..047ce8361b 100644
--- a/dev/breeze/doc/images/output_static-checks.svg
+++ b/dev/breeze/doc/images/output_static-checks.svg
@@ -324,7 +324,7 @@
 Run static checks.
 
 ╭─ Pre-commit flags 
 [...]
-│--type│--type-t│(all | bandit | blacken-docs | check-aiobotocore-optional |  │check-airflow-k8s-not-used | check-airflow-provider-compatibility |  │check-airflow-providers-bug-report-template | check-apache-license-rat | │check-code-deprecations | check-common-compat-used-for-openlineage | │check-compat-cache-on-methods | check-core-deprecation-classes | │check-daysago-import-from-utils | check-decorated-operator-implements-custom-name││| check-deferrable-default-value | check-docstring-param-types | │| check-deferrable-default | check-docstring-param-types |   │check-example-dags-urls | check-executables-have-shebangs |  │check-e

(airflow) branch v2-10-test updated: Rewrite how DAG to dataset / dataset alias are stored (#41987) (#42055)

2024-09-05 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
 new a02325f7b8 Rewrite how DAG to dataset / dataset alias are stored 
(#41987) (#42055)
a02325f7b8 is described below

commit a02325f7b827894fcd7294d11dbd6a656908
Author: Wei Lee 
AuthorDate: Fri Sep 6 14:41:30 2024 +0800

Rewrite how DAG to dataset / dataset alias are stored (#41987) (#42055)
---
 airflow/models/dag.py | 88 +--
 1 file changed, 50 insertions(+), 38 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index f848346780..58213efeec 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3236,8 +3236,6 @@ class DAG(LoggingMixin):
 if not dags:
 return
 
-from airflow.models.dataset import DagScheduleDatasetAliasReference
-
 log.info("Sync %s DAGs", len(dags))
 dag_by_ids = {dag.dag_id: dag for dag in dags}
 
@@ -3344,18 +3342,19 @@ class DAG(LoggingMixin):
 
 from airflow.datasets import Dataset
 from airflow.models.dataset import (
+DagScheduleDatasetAliasReference,
 DagScheduleDatasetReference,
 DatasetModel,
 TaskOutletDatasetReference,
 )
 
-dag_references: dict[str, set[Dataset | DatasetAlias]] = 
defaultdict(set)
+dag_references: dict[str, set[tuple[Literal["dataset", 
"dataset-alias"], str]]] = defaultdict(set)
 outlet_references = defaultdict(set)
 # We can't use a set here as we want to preserve order
-outlet_datasets: dict[DatasetModel, None] = {}
-input_datasets: dict[DatasetModel, None] = {}
+outlet_dataset_models: dict[DatasetModel, None] = {}
+input_dataset_models: dict[DatasetModel, None] = {}
 outlet_dataset_alias_models: set[DatasetAliasModel] = set()
-input_dataset_aliases: set[DatasetAliasModel] = set()
+input_dataset_alias_models: set[DatasetAliasModel] = set()
 
 # here we go through dags and tasks to check for dataset references
 # if there are now None and previously there were some, we delete them
@@ -3371,12 +3370,12 @@ class DAG(LoggingMixin):
 curr_orm_dag.schedule_dataset_alias_references = []
 else:
 for _, dataset in dataset_condition.iter_datasets():
-dag_references[dag.dag_id].add(Dataset(uri=dataset.uri))
-input_datasets[DatasetModel.from_public(dataset)] = None
+dag_references[dag.dag_id].add(("dataset", dataset.uri))
+input_dataset_models[DatasetModel.from_public(dataset)] = 
None
 
 for dataset_alias in dataset_condition.iter_dataset_aliases():
-dag_references[dag.dag_id].add(dataset_alias)
-
input_dataset_aliases.add(DatasetAliasModel.from_public(dataset_alias))
+dag_references[dag.dag_id].add(("dataset-alias", 
dataset_alias.name))
+
input_dataset_alias_models.add(DatasetAliasModel.from_public(dataset_alias))
 
 curr_outlet_references = curr_orm_dag and 
curr_orm_dag.task_outlet_dataset_references
 for task in dag.tasks:
@@ -3399,63 +3398,70 @@ class DAG(LoggingMixin):
 curr_outlet_references.remove(ref)
 
 for d in dataset_outlets:
+outlet_dataset_models[DatasetModel.from_public(d)] = None
 outlet_references[(task.dag_id, task.task_id)].add(d.uri)
-outlet_datasets[DatasetModel.from_public(d)] = None
 
 for d_a in dataset_alias_outlets:
 
outlet_dataset_alias_models.add(DatasetAliasModel.from_public(d_a))
 
-all_datasets = outlet_datasets
-all_datasets.update(input_datasets)
+all_dataset_models = outlet_dataset_models
+all_dataset_models.update(input_dataset_models)
 
 # store datasets
-stored_datasets: dict[str, DatasetModel] = {}
-new_datasets: list[DatasetModel] = []
-for dataset in all_datasets:
-stored_dataset = session.scalar(
+stored_dataset_models: dict[str, DatasetModel] = {}
+new_dataset_models: list[DatasetModel] = []
+for dataset in all_dataset_models:
+stored_dataset_model = session.scalar(
 select(DatasetModel).where(DatasetModel.uri == 
dataset.uri).limit(1)
 )
-if stored_dataset:
+if stored_dataset_model:
 # Some datasets may have been previously unreferenced, and 
therefore orphaned by the
 # scheduler. But if we're here, then we have found that 
da

(airflow) branch v2-10-test updated: Correct scheduled slots documentation and missing open telemetry span (#41899) (#41985)

2024-09-03 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
 new 333459ee78 Correct scheduled slots documentation and missing open 
telemetry span (#41899) (#41985)
333459ee78 is described below

commit 333459ee788c97d7316e15c2add6bb8e8e53fec1
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Tue Sep 3 21:14:24 2024 -0600

Correct scheduled slots documentation and missing open telemetry span 
(#41899) (#41985)

* Correct documentation for pool.scheduled_slots metrics

* Add missing pool.scheduled_slots telemetry span

(cherry picked from commit 0a816c6f0b500e1b0515452e38e3446412f3e8e3)

Co-authored-by: Matt Burke <84401060+mattogbu...@users.noreply.github.com>
---
 airflow/jobs/scheduler_job_runner.py  | 1 +
 .../administration-and-deployment/logging-monitoring/metrics.rst  | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 69789e9df9..cb82ec8c50 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1835,6 +1835,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 span.set_attribute(f"pool.queued_slots.{pool_name}", 
slot_stats["queued"])
 span.set_attribute(f"pool.running_slots.{pool_name}", 
slot_stats["running"])
 span.set_attribute(f"pool.deferred_slots.{pool_name}", 
slot_stats["deferred"])
+span.set_attribute(f"pool.scheduled_slots.{pool_name}", 
slot_stats["scheduled"])
 
 @provide_session
 def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> 
int:
diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index 82597712a8..f3e69a9541 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -242,8 +242,8 @@ Name 
Description
 ``pool.running_slots``   Number of running slots 
in the pool. Metric with pool_name tagging.
 ``pool.deferred_slots.``  Number of deferred slots 
in the pool
 ``pool.deferred_slots``  Number of deferred slots 
in the pool. Metric with pool_name tagging.
-``pool.scheduled_tasks.`` Number of scheduled tasks 
in the pool
-``pool.scheduled_tasks`` Number of scheduled tasks 
in the pool. Metric with pool_name tagging.
+``pool.scheduled_slots.`` Number of scheduled slots 
in the pool
+``pool.scheduled_slots`` Number of scheduled slots 
in the pool. Metric with pool_name tagging.
 ``pool.starving_tasks.``  Number of starving tasks 
in the pool
 ``pool.starving_tasks``  Number of starving tasks 
in the pool. Metric with pool_name tagging.
 ``task.cpu_usage_percent..``Percentage of CPU used by 
a task



(airflow) branch v2-10-test updated: Add warning that listeners can be dangerous (#41968)

2024-09-02 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
 new c45d013f9f Add warning that listeners can be dangerous (#41968)
c45d013f9f is described below

commit c45d013f9feb87874f893c892bb5d05aa750edd0
Author: Wei Lee 
AuthorDate: Tue Sep 3 10:55:55 2024 +0800

Add warning that listeners can be dangerous (#41968)

They are necessarily dangerous, but simply due to the way they are
integrated into Airflow, their side effects can be pretty impactful.

Co-authored-by: Jed Cunningham 
---
 docs/apache-airflow/administration-and-deployment/listeners.rst | 5 +
 1 file changed, 5 insertions(+)

diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst 
b/docs/apache-airflow/administration-and-deployment/listeners.rst
index a8dbda4c5d..34909e225a 100644
--- a/docs/apache-airflow/administration-and-deployment/listeners.rst
+++ b/docs/apache-airflow/administration-and-deployment/listeners.rst
@@ -21,6 +21,11 @@ Listeners
 You can write listeners to enable Airflow to notify you when events happen.
 `Pluggy <https://pluggy.readthedocs.io/en/stable/>`__ powers these listeners.
 
+.. warning::
+
+Listeners are an advanced feature of Airflow. They are not isolated from 
the Airflow components they run in, and
+can slow down or in come cases take down your Airflow instance. As such, 
extra care should be taken when writing listeners.
+
 Airflow supports notifications for the following events:
 
 Lifecycle Events



(airflow) branch main updated (22105482fb -> 824e85ad2f)

2024-09-02 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 22105482fb Remove `--tree` flag from `airflow tasks list` command 
(#41964)
 add 824e85ad2f Add warning that listeners can be dangerous (#41966)

No new revisions were added by this update.

Summary of changes:
 docs/apache-airflow/administration-and-deployment/listeners.rst | 5 +
 1 file changed, 5 insertions(+)



(airflow) branch v2-10-test updated: ci: improve check_deferrable_default script to cover positional variables (#41942)

2024-09-02 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
 new f036f3658e ci: improve check_deferrable_default script to cover 
positional variables (#41942)
f036f3658e is described below

commit f036f3658eb18b987e17a2258c40fbda8b80aaa6
Author: Wei Lee 
AuthorDate: Mon Sep 2 17:10:08 2024 +0800

ci: improve check_deferrable_default script to cover positional variables 
(#41942)
---
 scripts/ci/pre_commit/check_deferrable_default.py | 17 -
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/scripts/ci/pre_commit/check_deferrable_default.py 
b/scripts/ci/pre_commit/check_deferrable_default.py
index 8373385f0d..1e1fd9c7a6 100755
--- a/scripts/ci/pre_commit/check_deferrable_default.py
+++ b/scripts/ci/pre_commit/check_deferrable_default.py
@@ -74,14 +74,21 @@ def iter_check_deferrable_default_errors(module_filename: 
str) -> Iterator[str]:
 
 for node in init_method_nodes:
 args = node.args
-arguments = reversed([*args.args, *args.kwonlyargs])
+arguments = reversed([*args.args, *args.posonlyargs, *args.kwonlyargs])
 defaults = reversed([*args.defaults, *args.kw_defaults])
-for argument, default in zip(arguments, defaults):
-if argument is None or default is None:
+for argument, default in itertools.zip_longest(arguments, defaults):
+# argument is not deferrable
+if argument is None or argument.arg != "deferrable":
 continue
-if argument.arg != "deferrable" or 
_is_valid_deferrable_default(default):
+
+# argument is deferrable, but comes with no default value
+if default is None:
+yield f"{module_filename}:{argument.lineno}"
 continue
-yield f"{module_filename}:{default.lineno}"
+
+# argument is deferrable, but the default value is not valid
+if not _is_valid_deferrable_default(default):
+yield f"{module_filename}:{default.lineno}"
 
 
 def main() -> int:



(airflow) branch main updated (365b42f5a1 -> 3740b0c396)

2024-08-30 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 365b42f5a1 [FEAT] databricks repair run with reason match and 
appropriate new settings (#41412)
 add 3740b0c396 Use Databricks conn_id from env for DatabricksTaskOperator 
in system test (#41861)

No new revisions were added by this update.

Summary of changes:
 tests/system/providers/databricks/example_databricks_workflow.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated (6e011185bb -> 365b42f5a1)

2024-08-30 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 6e011185bb Support the unpack operator in signature (#41316)
 add 365b42f5a1 [FEAT] databricks repair run with reason match and 
appropriate new settings (#41412)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/databricks/hooks/databricks.py   |  10 ++
 .../providers/databricks/operators/databricks.py   |  67 +-
 .../providers/databricks/hooks/test_databricks.py  |  24 
 .../databricks/operators/test_databricks.py| 147 -
 4 files changed, 244 insertions(+), 4 deletions(-)



(airflow) branch v2-10-test updated: Set end_date and duration for triggers completed with end_from_trigger as True. (#41834)

2024-08-28 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
 new 55f0291017 Set end_date and duration for triggers completed with 
end_from_trigger as True. (#41834)
55f0291017 is described below

commit 55f0291017a2c23b8f6b40d60d373bd3740f2f53
Author: Wei Lee 
AuthorDate: Wed Aug 28 21:54:56 2024 +0800

Set end_date and duration for triggers completed with end_from_trigger as 
True. (#41834)

Co-authored-by: Karthikeyan Singaravelan 
---
 airflow/triggers/base.py |  2 +-
 tests/models/test_trigger.py | 10 +-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index 7b5338ad2f..bc1da861f3 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -203,7 +203,7 @@ class BaseTaskEndEvent(TriggerEvent):
 """
 # Mark the task with terminal state and prevent it from resuming on 
worker
 task_instance.trigger_id = None
-task_instance.state = self.task_instance_state
+task_instance.set_state(self.task_instance_state, session=session)
 self._submit_callback_if_necessary(task_instance=task_instance, 
session=session)
 self._push_xcoms_if_necessary(task_instance=task_instance)
 
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 4aa5b8b581..407d6edd75 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -19,7 +19,9 @@ from __future__ import annotations
 import datetime
 import json
 from typing import Any, AsyncIterator
+from unittest.mock import patch
 
+import pendulum
 import pytest
 import pytz
 from cryptography.fernet import Fernet
@@ -161,11 +163,15 @@ def test_submit_failure(session, create_task_instance):
 (TaskSkippedEvent, "skipped"),
 ],
 )
-def test_submit_event_task_end(session, create_task_instance, event_cls, 
expected):
+@patch("airflow.utils.timezone.utcnow")
+def test_submit_event_task_end(mock_utcnow, session, create_task_instance, 
event_cls, expected):
 """
 Tests that events inheriting BaseTaskEndEvent *don't* re-wake their 
dependent
 but mark them in the appropriate terminal state and send xcom
 """
+now = pendulum.now("UTC")
+mock_utcnow.return_value = now
+
 # Make a trigger
 trigger = Trigger(classpath="does.not.matter", kwargs={})
 trigger.id = 1
@@ -199,6 +205,8 @@ def test_submit_event_task_end(session, 
create_task_instance, event_cls, expecte
 ti = session.query(TaskInstance).one()
 assert ti.state == expected
 assert ti.next_kwargs is None
+assert ti.end_date == now
+assert ti.duration is not None
 actual_xcoms = {x.key: x.value for x in get_xcoms(ti)}
 assert actual_xcoms == {"return_value": "xcomret", "a": "b", "c": "d"}
 



(airflow) branch main updated: Set end_date and duration for triggers completed with end_from_trigger as True. (#41754)

2024-08-28 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 45740b19cf Set end_date and duration for triggers completed with 
end_from_trigger as True. (#41754)
45740b19cf is described below

commit 45740b19cfc5afcd4a3239504384357d7994c1c4
Author: Karthikeyan Singaravelan 
AuthorDate: Wed Aug 28 18:38:13 2024 +0530

Set end_date and duration for triggers completed with end_from_trigger as 
True. (#41754)
---
 airflow/triggers/base.py |  2 +-
 tests/models/test_trigger.py | 10 +-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index 7b5338ad2f..bc1da861f3 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -203,7 +203,7 @@ class BaseTaskEndEvent(TriggerEvent):
 """
 # Mark the task with terminal state and prevent it from resuming on 
worker
 task_instance.trigger_id = None
-task_instance.state = self.task_instance_state
+task_instance.set_state(self.task_instance_state, session=session)
 self._submit_callback_if_necessary(task_instance=task_instance, 
session=session)
 self._push_xcoms_if_necessary(task_instance=task_instance)
 
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 4aa5b8b581..407d6edd75 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -19,7 +19,9 @@ from __future__ import annotations
 import datetime
 import json
 from typing import Any, AsyncIterator
+from unittest.mock import patch
 
+import pendulum
 import pytest
 import pytz
 from cryptography.fernet import Fernet
@@ -161,11 +163,15 @@ def test_submit_failure(session, create_task_instance):
 (TaskSkippedEvent, "skipped"),
 ],
 )
-def test_submit_event_task_end(session, create_task_instance, event_cls, 
expected):
+@patch("airflow.utils.timezone.utcnow")
+def test_submit_event_task_end(mock_utcnow, session, create_task_instance, 
event_cls, expected):
 """
 Tests that events inheriting BaseTaskEndEvent *don't* re-wake their 
dependent
 but mark them in the appropriate terminal state and send xcom
 """
+now = pendulum.now("UTC")
+mock_utcnow.return_value = now
+
 # Make a trigger
 trigger = Trigger(classpath="does.not.matter", kwargs={})
 trigger.id = 1
@@ -199,6 +205,8 @@ def test_submit_event_task_end(session, 
create_task_instance, event_cls, expecte
 ti = session.query(TaskInstance).one()
 assert ti.state == expected
 assert ti.next_kwargs is None
+assert ti.end_date == now
+assert ti.duration is not None
 actual_xcoms = {x.key: x.value for x in get_xcoms(ti)}
 assert actual_xcoms == {"return_value": "xcomret", "a": "b", "c": "d"}
 



(airflow) branch main updated: test(providers/yandex): fix test_yandex_lockbox_secret_backend_get_connection_from_json by removing non-json extra (#41815)

2024-08-27 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new c018a47954 test(providers/yandex): fix 
test_yandex_lockbox_secret_backend_get_connection_from_json by removing 
non-json extra (#41815)
c018a47954 is described below

commit c018a479546ccc5d46eaf6c9aaf68f0d98f330cd
Author: Wei Lee 
AuthorDate: Wed Aug 28 14:22:11 2024 +0800

test(providers/yandex): fix 
test_yandex_lockbox_secret_backend_get_connection_from_json by removing 
non-json extra (#41815)
---
 tests/providers/yandex/secrets/test_lockbox.py | 18 --
 1 file changed, 4 insertions(+), 14 deletions(-)

diff --git a/tests/providers/yandex/secrets/test_lockbox.py 
b/tests/providers/yandex/secrets/test_lockbox.py
index aad00d4212..7b9ab19d3e 100644
--- a/tests/providers/yandex/secrets/test_lockbox.py
+++ b/tests/providers/yandex/secrets/test_lockbox.py
@@ -27,7 +27,6 @@ import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb
 import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb
 import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb
 
-from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.providers.yandex.secrets.lockbox import LockboxSecretBackend
 from airflow.providers.yandex.utils.defaults import default_conn_name
 
@@ -60,7 +59,7 @@ class TestLockboxSecretBackend:
 def test_yandex_lockbox_secret_backend_get_connection_from_json(self, 
mock_get_value):
 conn_id = "airflow_to_yandexcloud"
 conn_type = "yandex_cloud"
-extra = "some extra values"
+extra = '{"some": "extra values"}'
 c = {
 "conn_type": conn_type,
 "extra": extra,
@@ -68,18 +67,9 @@ class TestLockboxSecretBackend:
 
 mock_get_value.return_value = json.dumps(c)
 
-match = "Encountered non-JSON in `extra` field for connection 
'airflow_to_yandexcloud'. Support for non-JSON `extra` will be removed in 
Airflow 3.0"
-with pytest.warns(
-RemovedInAirflow3Warning,
-match=match,
-):
-conn = LockboxSecretBackend().get_connection(conn_id)
-
-with pytest.warns(
-RemovedInAirflow3Warning,
-match=match,
-):
-assert conn.extra == extra
+conn = LockboxSecretBackend().get_connection(conn_id)
+
+assert conn.extra == extra
 
 assert conn.conn_id == conn_id
 assert conn.conn_type == conn_type



(airflow) branch main updated (3c477e1e26 -> 938ab82c22)

2024-08-27 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 3c477e1e26 Remove deprecations in airflow.models.param (#41776)
 add 938ab82c22 Add logging device and logging device options to 
DockerSwarmOperator (#41416)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/docker/operators/docker_swarm.py | 27 +++-
 docs/spelling_wordlist.txt |  2 +
 .../docker/operators/test_docker_swarm.py  | 50 ++
 3 files changed, 78 insertions(+), 1 deletion(-)



(airflow) branch main updated: docs(deferring): fix wrong example and remove unnecessay example (#41691)

2024-08-26 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new ef8f3498b4 docs(deferring): fix wrong example and remove unnecessay 
example (#41691)
ef8f3498b4 is described below

commit ef8f3498b41cdd7927ed6f0ddcce7664a4707ab0
Author: Wei Lee 
AuthorDate: Mon Aug 26 16:41:01 2024 +0800

docs(deferring): fix wrong example and remove unnecessay example (#41691)
---
 .../authoring-and-scheduling/deferring.rst | 77 +++---
 1 file changed, 37 insertions(+), 40 deletions(-)

diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst 
b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index def9246968..0b477151a9 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -171,12 +171,16 @@ Here's a basic example of how a sensor might trigger 
deferral:
 
 .. code-block:: python
 
+from __future__ import annotations
+
 from datetime import timedelta
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
 from airflow.sensors.base import BaseSensorOperator
 from airflow.triggers.temporal import TimeDeltaTrigger
-from airflow.utils.context import Context
+
+if TYPE_CHECKING:
+from airflow.utils.context import Context
 
 
 class WaitOneHourSensor(BaseSensorOperator):
@@ -187,6 +191,7 @@ Here's a basic example of how a sensor might trigger 
deferral:
 # We have no more work to do here. Mark as complete.
 return
 
+
 When you opt to defer, your operator will stop executing at that point and be 
removed from its current worker. No state will persist, such as local variables 
or attributes set on ``self``. When your operator resumes, it resumes as a new 
instance of it. The only way you can pass state from the old instance of the 
operator to the new one is with ``method_name`` and ``kwargs``.
 
 When your operator resumes, Airflow adds a ``context`` object and an ``event`` 
object to the kwargs passed to the ``method_name`` method. This ``event`` 
object contains the payload from the trigger event that resumed your operator. 
Depending on the trigger, this can be useful to your operator, like it's a 
status code or URL to fetch results. Or, it might be unimportant information, 
like a datetime. Your ``method_name`` method, however, *must* accept 
``context`` and ``event`` as a keyword [...]
@@ -209,47 +214,31 @@ Triggering Deferral from Task Start
 If you want to defer your task directly to the triggerer without going into 
the worker, you can set class level attribute ``start_from_trigger`` to 
``True`` and add a class level attribute ``start_trigger_args`` with an 
``StartTriggerArgs`` object with the following 4 attributes to your deferrable 
operator:
 
 * ``trigger_cls``: An importable path to your trigger class.
-* ``trigger_kwargs``: Keyword arguments to pass to the ``trigger_cls`` when 
it's initialized. **Note that all the arguments need to be serializable. It's 
the main limitation of this feature.**
+* ``trigger_kwargs``: Keyword arguments to pass to the ``trigger_cls`` when 
it's initialized. **Note that all the arguments need to be serializable by 
Airflow. It's the main limitation of this feature.**
 * ``next_method``: The method name on your operator that you want Airflow to 
call when it resumes.
 * ``next_kwargs``: Additional keyword arguments to pass to the ``next_method`` 
when it is called.
 * ``timeout``: (Optional) A timedelta that specifies a timeout after which 
this deferral will fail, and fail the task instance. Defaults to ``None``, 
which means no timeout.
 
-This is particularly useful when deferring is the only thing the ``execute`` 
method does. Here's a basic refinement of the previous example. In the previous 
example, we used ``DateTimeTrigger`` which takes an argument ``delta`` with 
type ``datetime.timedelta`` which is not serializable. Thus, we need to create 
a new trigger with serializable arguments.
+In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as 
``trigger_cls``.
 
 .. code-block:: python
 
 from __future__ import annotations
 
-import datetime
-
-from airflow.triggers.temporal import DateTimeTrigger
-from airflow.utils import timezone
-
-
-class HourDeltaTrigger(DateTimeTrigger):
-def __init__(self, hours: int):
-moment = timezone.utcnow() + datetime.timedelta(hours=hours)
-super().__init__(moment=moment)
-
-
-In the sensor part, we'll need to provide the path to ``HourDeltaTrigger`` as 
``trigger_cls``.
-
-.. code-block:: python
-
-from __future__ import annotations
-
-from typing import Any
+from datetime import timedelta
+   

(airflow) branch main updated: feat(providers/openai): support batch api in hook/operator/trigger (#41554)

2024-08-22 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 00e73e6089 feat(providers/openai): support batch api in 
hook/operator/trigger (#41554)
00e73e6089 is described below

commit 00e73e6089f2d54a38944ec47303aa00f9d211d7
Author: Josix 
AuthorDate: Thu Aug 22 18:34:35 2024 +0800

feat(providers/openai): support batch api in hook/operator/trigger (#41554)

* feat(providers/openai)
* support batch api in hook/operator/trigger
* add wait_for_completion to OpenAITriggerBatchOperator

-

Co-authored-by: YungHsiu Chen 
Co-authored-by: Wei Lee 
---
 airflow/providers/openai/exceptions.py |  28 
 airflow/providers/openai/hooks/openai.py   | 103 -
 airflow/providers/openai/operators/openai.py   |  94 +++-
 airflow/providers/openai/provider.yaml |   5 +
 airflow/providers/openai/triggers/__init__.py  |  16 ++
 airflow/providers/openai/triggers/openai.py| 112 ++
 .../operators/openai.rst   |  25 
 tests/providers/openai/hooks/test_openai.py|  72 -
 tests/providers/openai/operators/test_openai.py|  73 -
 tests/providers/openai/test_exceptions.py  |  39 +
 tests/providers/openai/triggers/__init__.py|  16 ++
 tests/providers/openai/triggers/test_openai.py | 166 +
 .../openai/example_trigger_batch_operator.py   | 117 +++
 13 files changed, 858 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/openai/exceptions.py 
b/airflow/providers/openai/exceptions.py
new file mode 100644
index 00..eafba088c4
--- /dev/null
+++ b/airflow/providers/openai/exceptions.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.exceptions import AirflowException
+
+
+class OpenAIBatchJobException(AirflowException):
+"""Raise when OpenAI Batch Job fails to start AFTER processing the 
request."""
+
+
+class OpenAIBatchTimeout(AirflowException):
+"""Raise when OpenAI Batch Job times out."""
diff --git a/airflow/providers/openai/hooks/openai.py 
b/airflow/providers/openai/hooks/openai.py
index e66283afd6..cc8375f9ba 100644
--- a/airflow/providers/openai/hooks/openai.py
+++ b/airflow/providers/openai/hooks/openai.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+import time
+from enum import Enum
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, BinaryIO, Literal
 
@@ -24,6 +26,7 @@ from openai import OpenAI
 
 if TYPE_CHECKING:
 from openai.types import FileDeleted, FileObject
+from openai.types.batch import Batch
 from openai.types.beta import (
 Assistant,
 AssistantDeleted,
@@ -42,8 +45,29 @@ if TYPE_CHECKING:
 ChatCompletionToolMessageParam,
 ChatCompletionUserMessageParam,
 )
-
 from airflow.hooks.base import BaseHook
+from airflow.providers.openai.exceptions import OpenAIBatchJobException, 
OpenAIBatchTimeout
+
+
+class BatchStatus(str, Enum):
+"""Enum for the status of a batch."""
+
+VALIDATING = "validating"
+FAILED = "failed"
+IN_PROGRESS = "in_progress"
+FINALIZING = "finalizing"
+COMPLETED = "completed"
+EXPIRED = "expired"
+CANCELLING = "cancelling"
+CANCELLED = "cancelled"
+
+def __str__(self) -> str:
+return str(self.value)
+
+@classmethod
+def is_in_progress(cls, status: str) -> bool:
+"""Check if the batch status is in progress."""
+return status in (cls.VALIDATING, cls.IN_PROGRESS, cls.FINALIZING)
 
 
 class OpenAIHook(BaseHook):
@@ -288,13 +312,13 @@ class OpenAIHook(BaseHook):
 embeddings: list[float] = response.data[0].embedding
 return embeddings
 
-def 

(airflow) branch main updated: remove the removed --use-migration-files argument of "airflow db reset" command in run_generate_migration.sh (#41621)

2024-08-20 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 73cc7538ad remove the removed --use-migration-files argument of 
"airflow db reset" command in run_generate_migration.sh (#41621)
73cc7538ad is described below

commit 73cc7538adb107c892794b6451e2982b04863b9e
Author: Wei Lee 
AuthorDate: Wed Aug 21 11:07:30 2024 +0800

remove the removed --use-migration-files argument of "airflow db reset" 
command in run_generate_migration.sh (#41621)
---
 scripts/in_container/run_generate_migration.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/scripts/in_container/run_generate_migration.sh 
b/scripts/in_container/run_generate_migration.sh
index 77030ab852..50a6985513 100755
--- a/scripts/in_container/run_generate_migration.sh
+++ b/scripts/in_container/run_generate_migration.sh
@@ -16,9 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 # shellcheck source=scripts/in_container/_in_container_script_init.sh
-. "$( dirname "${BASH_SOURCE[0]}" )/_in_container_script_init.sh"
+. "$(dirname "${BASH_SOURCE[0]}")/_in_container_script_init.sh"
 
 cd "${AIRFLOW_SOURCES}" || exit 1
 cd "airflow" || exit 1
-airflow db reset --use-migration-files
+airflow db reset
 alembic revision --autogenerate -m "${@}"



(airflow) branch v2-10-test updated (d4c5a98aff -> 5e12fa981a)

2024-08-20 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


from d4c5a98aff Fix UI rendering when XCom is INT, FLOAT, BOOL or NULL 
(#41516) (#41605)
 add 5e12fa981a Fix InletEventsAccessors type stub (#41572) (#41607)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/context.pyi | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow-site) branch 2.10.0-blog-post updated (73bbed0bdb -> 88fd06952a)

2024-08-15 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch 2.10.0-blog-post
in repository https://gitbox.apache.org/repos/asf/airflow-site.git


from 73bbed0bdb Minor changes
 add 88fd06952a style: replace tab with spaces

No new revisions were added by this update.

Summary of changes:
 .../site/content/en/blog/airflow-2.10.0/index.md   | 25 --
 1 file changed, 14 insertions(+), 11 deletions(-)



(airflow) branch main updated (9489f50ef7 -> 5790cf7128)

2024-08-15 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 9489f50ef7 Fix try selector refresh (#41483)
 add 5790cf7128 Fix: Pass hook parameters to SnowflakeSqlApiHook and prep 
them for API call (#41150)

No new revisions were added by this update.

Summary of changes:
 .../providers/snowflake/hooks/snowflake_sql_api.py |  11 +-
 airflow/providers/snowflake/operators/snowflake.py |   1 +
 .../snowflake/hooks/test_snowflake_sql_api.py  | 140 +
 3 files changed, 148 insertions(+), 4 deletions(-)



(airflow) branch main updated (4e62909ff5 -> 3b42286aa4)

2024-08-12 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 4e62909ff5 Add comment on methodtools in mssql provider.yaml (#41402)
 add 3b42286aa4 Fix missing source link for the mapped task with index 0 
(#41403)

No new revisions were added by this update.

Summary of changes:
 airflow/www/static/js/components/SourceTaskInstance.tsx | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated: Adding Dataset Alias Example DAG with classic operators (#41302)

2024-08-08 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 5fe1d45111 Adding Dataset Alias Example DAG with classic operators 
(#41302)
5fe1d45111 is described below

commit 5fe1d4577563c4c703d35817e59172f736cc
Author: vatsrahul1001 <43964496+vatsrahul1...@users.noreply.github.com>
AuthorDate: Thu Aug 8 13:05:20 2024 +0530

Adding Dataset Alias Example DAG with classic operators (#41302)
---
 airflow/example_dags/example_dataset_alias.py  |   6 +-
 .../example_dataset_alias_with_no_taskflow.py  | 108 +
 tests/www/views/test_views_acl.py  |  20 
 3 files changed, 131 insertions(+), 3 deletions(-)

diff --git a/airflow/example_dags/example_dataset_alias.py 
b/airflow/example_dags/example_dataset_alias.py
index 37d368ccb1..c50a89e34f 100644
--- a/airflow/example_dags/example_dataset_alias.py
+++ b/airflow/example_dags/example_dataset_alias.py
@@ -22,15 +22,15 @@ Notes on usage:
 
 Turn on all the DAGs.
 
-Before running any DAG, the schedule of the "dataset-alias-consumer" DAG will 
show as "Unresolved DatasetAlias".
+Before running any DAG, the schedule of the 
"dataset_alias_example_alias_consumer" DAG will show as "Unresolved 
DatasetAlias".
 This is expected because the dataset alias has not been resolved into any 
dataset yet.
 
-Once the "dataset-alias-producer" DAG is triggered, the "dataset-consumer" DAG 
should be triggered upon completion.
+Once the "dataset_s3_bucket_producer" DAG is triggered, the 
"dataset_s3_bucket_consumer" DAG should be triggered upon completion.
 This is because the dataset alias "example-alias" is used to add a dataset 
event to the dataset "s3://bucket/my-task"
 during the "produce_dataset_events_through_dataset_alias" task.
 As the DAG "dataset-alias-consumer" relies on dataset alias "example-alias" 
which was previously unresolved,
 the DAG "dataset-alias-consumer" (along with all the DAGs in the same file) 
will be re-parsed and
-thus update its schedule to the dataset "s3://bucket/my-task" and will be 
triggered.
+thus update its schedule to the dataset "s3://bucket/my-task" and will also be 
triggered.
 """
 
 from __future__ import annotations
diff --git a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py 
b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
new file mode 100644
index 00..7d7227af39
--- /dev/null
+++ b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG for demonstrating the behavior of the DatasetAlias feature in 
Airflow, including conditional and
+dataset expression-based scheduling.
+
+Notes on usage:
+
+Turn on all the DAGs.
+
+Before running any DAG, the schedule of the 
"dataset_alias_example_alias_consumer_with_no_taskflow" DAG will show as 
"unresolved DatasetAlias".
+This is expected because the dataset alias has not been resolved into any 
dataset yet.
+
+Once the "dataset_s3_bucket_producer_with_no_taskflow" DAG is triggered, the 
"dataset_s3_bucket_consumer_with_no_taskflow" DAG should be triggered upon 
completion.
+This is because the dataset alias "example-alias-no-taskflow" is used to add a 
dataset event to the dataset "s3://bucket/my-task-with-no-taskflow"
+during the "produce_dataset_events_through_dataset_alias_with_no_taskflow" 
task. Also, the schedule of the 
"dataset_alias_example_alias_consumer_with_no_taskflow" DAG should change to 
"Dataset" as
+the dataset alias "example-alias-no-taskflow" is now resolved to the dataset 
"s3://bucket/my-task-with-no-taskflow" and this DAG should also be triggered.
+"""
+
+from __future__ import annotations
+
+import pendulum
+
+from a

(airflow) branch main updated: fix wrong link to the source DAG in consumer DAG's dataset event section #41301

2024-08-07 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 49669c95dc fix wrong link to the source DAG in consumer DAG's dataset 
event section #41301
49669c95dc is described below

commit 49669c95dcbe2af0afc60559c740626eb05db52c
Author: Wei Lee 
AuthorDate: Thu Aug 8 11:56:10 2024 +0800

fix wrong link to the source DAG in consumer DAG's dataset event section 
#41301
---
 airflow/www/static/js/components/SourceTaskInstance.tsx | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/www/static/js/components/SourceTaskInstance.tsx 
b/airflow/www/static/js/components/SourceTaskInstance.tsx
index e2db366c29..fd64faddb4 100644
--- a/airflow/www/static/js/components/SourceTaskInstance.tsx
+++ b/airflow/www/static/js/components/SourceTaskInstance.tsx
@@ -35,6 +35,7 @@ type SourceTIProps = {
 };
 
 const gridUrl = getMetaValue("grid_url");
+const dagId = getMetaValue("dag_id") || "__DAG_ID__";
 
 const SourceTaskInstance = ({
   datasetEvent,
@@ -56,7 +57,7 @@ const SourceTaskInstance = ({
   });
 
   let url = `${gridUrl?.replace(
-"__DAG_ID__",
+dagId,
 sourceDagId || ""
   )}?dag_run_id=${encodeURIComponent(
 sourceRunId || ""



(airflow) branch main updated (16abb911e8 -> 1544b08523)

2024-08-06 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 16abb911e8 Upgrade package gcloud-aio-auth>=5.2.0 (#41262)
 add 1544b08523 Change inserted airflow version of 
"update-migration-references" command from airflow_version='...' to 
airflow_version="..." #41275

No new revisions were added by this update.

Summary of changes:
 scripts/in_container/run_migration_reference.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated: Adjust gantt width based on task history dates (#41192)

2024-08-01 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new a9baf71304 Adjust gantt width based on task history dates (#41192)
a9baf71304 is described below

commit a9baf71304650bf3ed45187ac65ff7647bdedf74
Author: Brent Bovenzi 
AuthorDate: Fri Aug 2 00:40:01 2024 -0400

Adjust gantt width based on task history dates (#41192)
---
 airflow/www/static/js/dag/details/gantt/Row.tsx   | 17 +-
 airflow/www/static/js/dag/details/gantt/index.tsx | 67 +++
 2 files changed, 59 insertions(+), 25 deletions(-)

diff --git a/airflow/www/static/js/dag/details/gantt/Row.tsx 
b/airflow/www/static/js/dag/details/gantt/Row.tsx
index c93d1d7fde..15df6da5c7 100644
--- a/airflow/www/static/js/dag/details/gantt/Row.tsx
+++ b/airflow/www/static/js/dag/details/gantt/Row.tsx
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-import React from "react";
+import React, { useEffect } from "react";
 import { Box } from "@chakra-ui/react";
 import useSelection from "src/dag/useSelection";
 import { boxSize } from "src/dag/StatusBox";
@@ -32,6 +32,11 @@ interface Props {
   task: Task;
   ganttStartDate?: string | null;
   ganttEndDate?: string | null;
+  setGanttDuration?: (
+queued: string | null | undefined,
+start: string | null | undefined,
+end: string | null | undefined
+  ) => void;
 }
 
 const dagId = getMetaValue("dag_id");
@@ -42,6 +47,7 @@ const Row = ({
   task,
   ganttStartDate,
   ganttEndDate,
+  setGanttDuration,
 }: Props) => {
   const {
 selected: { runId, taskId },
@@ -61,6 +67,15 @@ const Row = ({
   const isSelected = taskId === instance?.taskId;
   const isOpen = openGroupIds.includes(task.id || "");
 
+  // Adjust gantt start/end if the ti history dates are out of bounds
+  useEffect(() => {
+tiHistory?.forEach(
+  (tih) =>
+setGanttDuration &&
+setGanttDuration(tih.queuedWhen, tih.startDate, tih.endDate)
+);
+  }, [tiHistory, setGanttDuration]);
+
   return (
 
{
 
   const dagRun = dagRuns.find((dr) => dr.runId === runId);
 
-  let startDate = dagRun?.queuedAt || dagRun?.startDate;
-  // @ts-ignore
-  let endDate = dagRun?.endDate ?? moment().add(1, "s").toString();
+  const [startDate, setStartDate] = useState(
+dagRun?.queuedAt || dagRun?.startDate
+  );
+
+  const [endDate, setEndDate] = useState(
+// @ts-ignore
+dagRun?.endDate ?? moment().add(1, "s").toString()
+  );
 
   // Check if any task instance dates are outside the bounds of the dag run 
dates and update our min start and max end
-  groups.children?.forEach((task) => {
-const taskInstance = task.instances.find((ti) => ti.runId === runId);
-if (
-  taskInstance?.queuedDttm &&
-  (!startDate ||
-Date.parse(taskInstance.queuedDttm) < Date.parse(startDate))
-) {
-  startDate = taskInstance.queuedDttm;
-} else if (
-  taskInstance?.startDate &&
-  (!startDate || Date.parse(taskInstance.startDate) < 
Date.parse(startDate))
-) {
-  startDate = taskInstance.startDate;
-}
+  const setGanttDuration = useCallback(
+(
+  queued: string | null | undefined,
+  start: string | null | undefined,
+  end: string | null | undefined
+) => {
+  if (
+queued &&
+(!startDate || Date.parse(queued) < Date.parse(startDate))
+  ) {
+setStartDate(queued);
+  } else if (
+start &&
+(!startDate || Date.parse(start) < Date.parse(startDate))
+  ) {
+setStartDate(start);
+  }
+
+  if (end && (!endDate || Date.parse(end) > Date.parse(endDate))) {
+setEndDate(end);
+  }
+},
+[startDate, endDate, setStartDate, setEndDate]
+  );
 
-if (
-  taskInstance?.endDate &&
-  (!endDate || Date.parse(taskInstance.endDate) > Date.parse(endDate))
-) {
-  endDate = taskInstance.endDate;
-}
-  });
+  useEffect(() => {
+groups.children?.forEach((task) => {
+  const taskInstance = task.instances.find((ti) => ti.runId === runId);
+  setGanttDuration(
+taskInstance?.queuedDttm,
+taskInstance?.startDate,
+taskInstance?.endDate
+  );
+});
+  }, [groups.children, runId, setGanttDuration]);
 
   const numBars = Math.round(width / 100);
   const runDuration = getDuration(startDate, endDate);
@@ -195,6 +213,7 @@ const Gantt = ({ openGroupIds, gridScrollRef, 
ganttScrollRef }: Props) => {
 task={c}
 ganttStartDate={startDate}
 ganttEndDate={endDate}
+setGanttDuration={setGanttDuration}
 key={`gantt-${c.id}`}
   />
 ))}



(airflow) branch main updated (9f7599511f -> b28e8bfd03)

2024-08-01 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 9f7599511f Return string representation if xcomargs existing when 
include_xcom is set to False during resolving
 add b28e8bfd03 Allowing DateTimeSensorAsync, FileSensor and 
TimeSensorAsync to start execution from trigger during dynamic task mapping 
(#41182)

No new revisions were added by this update.

Summary of changes:
 airflow/sensors/date_time.py   |  11 +-
 airflow/sensors/filesystem.py  |   5 +-
 airflow/sensors/time_sensor.py |   6 +-
 .../authoring-and-scheduling/deferring.rst | 119 +++--
 4 files changed, 83 insertions(+), 58 deletions(-)



(airflow) branch main updated (67f117060e -> 9f7599511f)

2024-08-01 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 67f117060e Fix default behaviour for init function in AppriseNotifier 
(#41054)
 add 9f7599511f Return string representation if xcomargs existing when 
include_xcom is set to False during resolving

No new revisions were added by this update.

Summary of changes:
 airflow/models/expandinput.py | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)



(airflow) branch main updated: fix wrong link in TriggeredDagRuns (#41166)

2024-07-31 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new a6f954e873 fix wrong link in TriggeredDagRuns (#41166)
a6f954e873 is described below

commit a6f954e873e363608d79d0e444c5ef1099b911e8
Author: Wei Lee 
AuthorDate: Thu Aug 1 13:35:32 2024 +0800

fix wrong link in TriggeredDagRuns (#41166)
---
 airflow/www/static/js/components/TriggeredDagRuns.tsx | 9 +
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/airflow/www/static/js/components/TriggeredDagRuns.tsx 
b/airflow/www/static/js/components/TriggeredDagRuns.tsx
index f017fbe488..26cf1abac6 100644
--- a/airflow/www/static/js/components/TriggeredDagRuns.tsx
+++ b/airflow/www/static/js/components/TriggeredDagRuns.tsx
@@ -43,10 +43,11 @@ const TriggeredDagRuns = ({ createdDagRuns, showLink = true 
}: CardProps) => {
 
   {createdDagRuns.map((run) => {
 const runId = (run as any).dagRunId; // For some reason the type is 
wrong here
-const url = `${gridUrl?.replace(
-  "__DAG_ID__",
-  run.dagId || ""
-)}?dag_run_id=${encodeURIComponent(runId)}`;
+const splitGridUrl = gridUrl.split("/");
+splitGridUrl[2] = run.dagId || "";
+const url = `${splitGridUrl.join("/")}?dag_run_id=${encodeURIComponent(
+  runId
+)}`;
 
 return (
   

(airflow) branch main updated (89fd695aa8 -> 2521555989)

2024-07-30 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 89fd695aa8 add example dag for dataset_alias (#41037)
 add 2521555989 Add start execution from trigger support for existing core 
sensors (#41021)

No new revisions were added by this update.

Summary of changes:
 airflow/sensors/date_time.py   | 23 +--
 airflow/sensors/filesystem.py  | 21 +
 airflow/sensors/time_sensor.py | 24 +++-
 3 files changed, 65 insertions(+), 3 deletions(-)



(airflow) branch main updated: add example dag for dataset_alias (#41037)

2024-07-30 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 89fd695aa8 add example dag for dataset_alias (#41037)
89fd695aa8 is described below

commit 89fd695aa8fa5d5185118c6d0a3c4cf45fa5
Author: Wei Lee 
AuthorDate: Tue Jul 30 18:06:07 2024 +0800

add example dag for dataset_alias (#41037)
---
 airflow/example_dags/example_dataset_alias.py | 104 ++
 tests/always/test_example_dags.py |   2 +-
 2 files changed, 105 insertions(+), 1 deletion(-)

diff --git a/airflow/example_dags/example_dataset_alias.py 
b/airflow/example_dags/example_dataset_alias.py
new file mode 100644
index 00..1e29241b8b
--- /dev/null
+++ b/airflow/example_dags/example_dataset_alias.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG for demonstrating the behavior of the DatasetAlias feature in 
Airflow, including conditional and
+dataset expression-based scheduling.
+
+Notes on usage:
+
+Turn on all the DAGs.
+
+Before running any DAG, the schedule of the "dataset-alias-consumer" DAG will 
show as "unresolved DatasetAlias".
+This is expected because the dataset alias has not been resolved into any 
dataset yet.
+
+Once the "dataset-alias-producer" DAG is triggered, the "dataset-consumer" DAG 
should be triggered upon completion.
+This is because the dataset alias "example-alias" is used to add a dataset 
event to the dataset "s3://bucket/my-task"
+during the "produce_dataset_events_through_dataset_alias" task.
+After the completion of this task, the schedule of the 
"dataset-alias-consumer" DAG should change to "Dataset" as
+the dataset alias "example-alias" is now resolved to the dataset 
"s3://bucket/my-task".
+It's expected that the "dataset-alias-consumer" DAG is not triggered at this 
point, despite also relying on
+the dataset alias "example-alias," which was initially resolved to nothing.
+Once the resolution occurs, triggering either the "dataset-producer" or 
"dataset-alias-producer" DAG should
+also trigger both the "dataset-consumer" and "dataset-alias-consumer" DAGs.
+"""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow import DAG
+from airflow.datasets import Dataset, DatasetAlias
+from airflow.decorators import task
+
+with DAG(
+dag_id="dataset_s3_bucket_producer",
+start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+schedule=None,
+catchup=False,
+tags=["producer", "dataset"],
+):
+
+@task(outlets=[Dataset("s3://bucket/my-task")])
+def produce_dataset_events():
+pass
+
+produce_dataset_events()
+
+with DAG(
+dag_id="dataset_alias_example_alias_producer",
+start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+schedule=None,
+catchup=False,
+tags=["producer", "dataset-alias"],
+):
+
+@task(outlets=[DatasetAlias("example-alias")])
+def produce_dataset_events_through_dataset_alias(*, outlet_events=None):
+bucket_name = "bucket"
+object_path = "my-task"
+
outlet_events["example-alias"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
+
+produce_dataset_events_through_dataset_alias()
+
+with DAG(
+dag_id="dataset_s3_bucket_consumer",
+start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+schedule=[Dataset("s3://bucket/my-task")],
+catchup=False,
+tags=["consumer", "dataset"],
+):
+
+@task
+def consume_dataset_event():
+pass
+
+consume_dataset_event()
+
+with DAG(
+dag_id="dataset_alias_example_alias_consumer",
+start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+schedule=[DatasetAlias("example-alias")],
+

(airflow) branch main updated: Improve logging in _register_dataset_changes (#41089)

2024-07-29 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new c0cc614b2d Improve logging in _register_dataset_changes (#41089)
c0cc614b2d is described below

commit c0cc614b2d494d610431fc2796388b86b64af731
Author: Tzu-ping Chung 
AuthorDate: Mon Jul 29 16:13:44 2024 +0800

Improve logging in _register_dataset_changes (#41089)

Logging should be smart enough to render rich objects directly, so we
don't need to format the object string ourselves. This makes the log
messages a little longer, but should be be acceptable.

This also fixed a typo in the message ("did not exists").
---
 airflow/models/taskinstance.py | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 69bb7b03ff..5b00dba557 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2996,7 +2996,7 @@ class TaskInstance(Base, LoggingMixin):
 if not dataset_obj:
 dataset_obj = DatasetModel(uri=uri)
 dataset_manager.create_datasets(dataset_models=[dataset_obj], 
session=session)
-self.log.warning('Created a new Dataset(uri="%s") as it did 
not exists.', uri)
+self.log.warning("Created a new %r as it did not exist.", 
dataset_obj)
 dataset_objs_cache[uri] = dataset_obj
 
 for alias in alias_names:
@@ -3007,9 +3007,8 @@ class TaskInstance(Base, LoggingMixin):
 
 extra = {k: v for k, v in extra_items}
 self.log.info(
-'Create dataset event Dataset(uri="%s", extra="%s") through 
dataset aliases "%s"',
-uri,
-extra,
+'Creating event for %r through aliases "%s"',
+dataset_obj,
 ", ".join(alias_names),
 )
 dataset_manager.register_dataset_change(



(airflow) branch main updated: add section "Manipulating queued dataset events through REST API" (#41022)

2024-07-28 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 129911b2bf add section "Manipulating queued dataset events through 
REST API" (#41022)
129911b2bf is described below

commit 129911b2bf9ce4fe1fd3d377e9e1443c27f5ceb7
Author: Wei Lee 
AuthorDate: Mon Jul 29 09:37:37 2024 +0800

add section "Manipulating queued dataset events through REST API" (#41022)
---
 .../authoring-and-scheduling/datasets.rst  | 28 ++
 1 file changed, 28 insertions(+)

diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst 
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index 25d6fb90d4..53b4ad38cd 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -321,6 +321,34 @@ Example:
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one dataset given to the DAG, and the first of one 
DatasetEvent for that dataset. An implementation can be quite complex if you 
have multiple datasets, potentially with multiple DatasetEvents.
 
 
+Manipulating queued dataset events through REST API
+---
+
+.. versionadded:: 2.9
+
+In this example, the DAG ``waiting_for_dataset_1_and_2`` will be triggered 
when tasks update both datasets "dataset-1" and "dataset-2". Once "dataset-1" 
is updated, Airflow creates a record. This ensures that Airflow knows to 
trigger the DAG when "dataset-2" is updated. We call such records queued 
dataset events.
+
+.. code-block:: python
+
+with DAG(
+dag_id="waiting_for_dataset_1_and_2",
+schedule=[Dataset("dataset-1"), Dataset("dataset-2")],
+...,
+):
+...
+
+
+``quededEvent`` API endpoints are introduced to manipulate such records.
+
+* Get a queued Dataset event for a DAG: ``/datasets/queuedEvent/{uri}``
+* Get queued Dataset events for a DAG: ``/dags/{dag_id}/datasets/queuedEvent``
+* Delete a queued Dataset event for a DAG: ``/datasets/queuedEvent/{uri}``
+* Delete queued Dataset events for a DAG: 
``/dags/{dag_id}/datasets/queuedEvent``
+* Get queued Dataset events for a Dataset: 
``/dags/{dag_id}/datasets/queuedEvent/{uri}``
+* Delete queued Dataset events for a Dataset: ``DELETE 
/dags/{dag_id}/datasets/queuedEvent/{uri}``
+
+ For how to use REST API and the parameters needed for these endpoints, please 
refer to :doc:`Airflow API `
+
 Advanced dataset scheduling with conditional expressions
 
 



(airflow) branch main updated (277e746fa4 -> f0ef69198e)

2024-07-26 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 277e746fa4 openlineage: update docs on openlineage methods (#41051)
 add f0ef69198e Check dataset_alias exists in inlets when use it to 
retrieve inlet_events (#41043)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/context.py  | 3 ++-
 tests/models/test_taskinstance.py | 2 ++
 2 files changed, 4 insertions(+), 1 deletion(-)



(airflow) branch main updated (fa927276b5 -> cbc4c891ae)

2024-07-26 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from fa927276b5 Enhance start_trigger_args serialization (#40993)
 add cbc4c891ae Add string representation to dataset alias (#41041)

No new revisions were added by this update.

Summary of changes:
 airflow/models/dataset.py | 4 
 1 file changed, 4 insertions(+)



(airflow) branch main updated: Enable ending the task directly from the triggerer without going into the worker. (#40084)

2024-07-24 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 0fba616ccb Enable ending the task directly from the triggerer without 
going into the worker. (#40084)
0fba616ccb is described below

commit 0fba616ccb2117d6225d4e14e8e539acda4a9fd7
Author: Ankit Chaurasia <8670962+sunank...@users.noreply.github.com>
AuthorDate: Thu Jul 25 09:12:14 2024 +0545

Enable ending the task directly from the triggerer without going into the 
worker. (#40084)
---
 airflow/callbacks/callback_requests.py |  19 +++-
 airflow/dag_processing/processor.py|  37 ++-
 airflow/exceptions.py  |   5 +-
 airflow/models/baseoperator.py |   5 +-
 airflow/models/dagbag.py   |  12 +++
 airflow/models/taskinstance.py |   9 +-
 airflow/models/trigger.py  |   9 +-
 airflow/sensors/date_time.py   |  15 +--
 airflow/sensors/time_delta.py  |  13 ++-
 airflow/sensors/time_sensor.py |  13 +--
 airflow/triggers/base.py   | 118 -
 airflow/triggers/temporal.py   |  31 --
 .../authoring-and-scheduling/deferring.rst |  53 +
 tests/callbacks/test_callback_requests.py  |   2 -
 tests/models/test_trigger.py   |  64 ++-
 tests/sensors/test_time_sensor.py  |   2 +-
 tests/triggers/test_temporal.py|  21 ++--
 17 files changed, 368 insertions(+), 60 deletions(-)

diff --git a/airflow/callbacks/callback_requests.py 
b/airflow/callbacks/callback_requests.py
index 8ec0187978..7158c45d44 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 import json
 from typing import TYPE_CHECKING
 
+from airflow.utils.state import TaskInstanceState
+
 if TYPE_CHECKING:
 from airflow.models.taskinstance import SimpleTaskInstance
 
@@ -68,22 +70,33 @@ class TaskCallbackRequest(CallbackRequest):
 
 :param full_filepath: File Path to use to run the callback
 :param simple_task_instance: Simplified Task Instance representation
-:param is_failure_callback: Flag to determine whether it is a Failure 
Callback or Success Callback
 :param msg: Additional Message that can be used for logging to determine 
failure/zombie
 :param processor_subdir: Directory used by Dag Processor when parsed the 
dag.
+:param task_callback_type: e.g. whether on success, on failure, on retry.
 """
 
 def __init__(
 self,
 full_filepath: str,
 simple_task_instance: SimpleTaskInstance,
-is_failure_callback: bool | None = True,
 processor_subdir: str | None = None,
 msg: str | None = None,
+task_callback_type: TaskInstanceState | None = None,
 ):
 super().__init__(full_filepath=full_filepath, 
processor_subdir=processor_subdir, msg=msg)
 self.simple_task_instance = simple_task_instance
-self.is_failure_callback = is_failure_callback
+self.task_callback_type = task_callback_type
+
+@property
+def is_failure_callback(self) -> bool:
+"""Returns True if the callback is a failure callback."""
+if self.task_callback_type is None:
+return True
+return self.task_callback_type in {
+TaskInstanceState.FAILED,
+TaskInstanceState.UP_FOR_RETRY,
+TaskInstanceState.UPSTREAM_FAILED,
+}
 
 def to_json(self) -> str:
 from airflow.serialization.serialized_objects import BaseSerialization
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 84049de4e2..3cc2fe142b 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -47,7 +47,7 @@ from airflow.models.dagrun import DagRun as DR
 from airflow.models.dagwarning import DagWarning, DagWarningType
 from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance, TaskInstance as TI
+from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, 
_run_finished_callback
 from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.email import get_email_address_list, send_email
@@ -808,8 +808,26 @@ class DagFileProcessor(LoggingMixin):
 @provide_session
 def _execute_task_callbacks(
 cls, dagbag: DagBag | None, request: TaskCallbackRequest, 
unit_test_mode: bool, session: Session
-):
-if not request.is_failure_callbac

(airflow) branch main updated (7f20b1eed8 -> e9d2d5c86a)

2024-07-23 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 7f20b1eed8 Extend get_datasets endpoint to include dataset aliases 
(#40830)
 add e9d2d5c86a Retrieve inlet dataset events through dataset aliases 
(#40809)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/context.py   |  40 --
 .../authoring-and-scheduling/datasets.rst  |  24 
 tests/models/test_taskinstance.py  | 146 +
 3 files changed, 202 insertions(+), 8 deletions(-)



(airflow) branch main updated: Extend get_datasets endpoint to include dataset aliases (#40830)

2024-07-22 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 7f20b1eed8 Extend get_datasets endpoint to include dataset aliases 
(#40830)
7f20b1eed8 is described below

commit 7f20b1eed8e05e6c31e4632a721869841ca8412b
Author: Wei Lee 
AuthorDate: Tue Jul 23 14:36:37 2024 +0800

Extend get_datasets endpoint to include dataset aliases (#40830)
---
 airflow/api_connexion/schemas/dataset_schema.py| 14 ++
 tests/api_connexion/endpoints/test_dataset_endpoint.py |  7 +--
 tests/api_connexion/schemas/test_dataset_schema.py | 14 +-
 tests/www/views/test_views_dataset.py  |  2 +-
 4 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/airflow/api_connexion/schemas/dataset_schema.py 
b/airflow/api_connexion/schemas/dataset_schema.py
index fc80ccc672..b8aaf2f8fa 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -26,6 +26,7 @@ from airflow.api_connexion.schemas.common_schema import 
JsonObjectField
 from airflow.models.dagrun import DagRun
 from airflow.models.dataset import (
 DagScheduleDatasetReference,
+DatasetAliasModel,
 DatasetEvent,
 DatasetModel,
 TaskOutletDatasetReference,
@@ -59,6 +60,18 @@ class DagScheduleDatasetReferenceSchema(SQLAlchemySchema):
 updated_at = auto_field()
 
 
+class DatasetAliasSchema(SQLAlchemySchema):
+"""DatasetAlias DB schema."""
+
+class Meta:
+"""Meta."""
+
+model = DatasetAliasModel
+
+id = auto_field()
+name = auto_field()
+
+
 class DatasetSchema(SQLAlchemySchema):
 """Dataset DB schema."""
 
@@ -74,6 +87,7 @@ class DatasetSchema(SQLAlchemySchema):
 updated_at = auto_field()
 producing_tasks = 
fields.List(fields.Nested(TaskOutletDatasetReferenceSchema))
 consuming_dags = 
fields.List(fields.Nested(DagScheduleDatasetReferenceSchema))
+aliases = fields.List(fields.Nested(DatasetAliasSchema))
 
 
 class DatasetCollection(NamedTuple):
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py 
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 5b6e2f2414..fa278a92d6 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -109,7 +109,7 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
 self._create_dataset(session)
 assert session.query(DatasetModel).count() == 1
 
-with assert_queries_count(5):
+with assert_queries_count(6):
 response = self.client.get(
 f"/api/v1/datasets/{urllib.parse.quote('s3://bucket/key', 
safe='')}",
 environ_overrides={"REMOTE_USER": "test"},
@@ -123,6 +123,7 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
 "updated_at": self.default_time,
 "consuming_dags": [],
 "producing_tasks": [],
+"aliases": [],
 }
 
 def test_should_respond_404(self):
@@ -176,7 +177,7 @@ class TestGetDatasets(TestDatasetEndpoint):
 session.commit()
 assert session.query(DatasetModel).count() == 2
 
-with assert_queries_count(8):
+with assert_queries_count(10):
 response = self.client.get("/api/v1/datasets", 
environ_overrides={"REMOTE_USER": "test"})
 
 assert response.status_code == 200
@@ -191,6 +192,7 @@ class TestGetDatasets(TestDatasetEndpoint):
 "updated_at": self.default_time,
 "consuming_dags": [],
 "producing_tasks": [],
+"aliases": [],
 },
 {
 "id": 2,
@@ -200,6 +202,7 @@ class TestGetDatasets(TestDatasetEndpoint):
 "updated_at": self.default_time,
 "consuming_dags": [],
 "producing_tasks": [],
+"aliases": [],
 },
 ],
 "total_entries": 2,
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py 
b/tests/api_connexion/schemas/test_dataset_schema.py
index 2a88dd9865..087fb8a840 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -28,7 +28,7 @@ from airflow.api_connexion.schemas.dataset_schema import (
 dataset_schema,
 )
 from airflow.datasets import Dataset
-from airflow.models.dataset import DatasetEvent, DatasetModel
+from a

(airflow) branch main updated (85b2666eab -> 05a5df86c9)

2024-07-22 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 85b2666eab Add start execution from triggerer support to dynamic task 
mapping (#39912)
 add 05a5df86c9 Fix dataset_with_extra_from_classic_operator example DAG 
(#40747)

No new revisions were added by this update.

Summary of changes:
 airflow/example_dags/example_outlet_event_extra.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



(airflow) branch main updated: Add start execution from triggerer support to dynamic task mapping (#39912)

2024-07-22 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 85b2666eab Add start execution from triggerer support to dynamic task 
mapping (#39912)
85b2666eab is described below

commit 85b2666eabc655a99a31609a7a27a3c577c1eefb
Author: Wei Lee 
AuthorDate: Mon Jul 22 16:29:37 2024 +0800

Add start execution from triggerer support to dynamic task mapping (#39912)

* feat(dagrun): add start_from_trigger support to mapped operator
* feat(mapped_operator): add partial support to start_trigger_args
* feat(mappedoperator): do not include xcom when expanding start trigger 
args and flag
---
 airflow/decorators/base.py |  6 +-
 airflow/models/abstractoperator.py | 22 +++
 airflow/models/baseoperator.py | 22 +++
 airflow/models/dagrun.py   | 20 --
 airflow/models/expandinput.py  | 31 +
 airflow/models/mappedoperator.py   | 75 --
 airflow/models/param.py|  2 +-
 airflow/models/taskinstance.py | 19 --
 airflow/models/xcom_arg.py | 16 ++---
 airflow/template/templater.py  |  4 +-
 airflow/utils/mixins.py|  2 +-
 .../authoring-and-scheduling/deferring.rst | 46 -
 tests/models/test_dagrun.py| 32 +
 13 files changed, 254 insertions(+), 43 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 5a20fa55dd..d743acbe50 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -550,11 +550,13 @@ class DecoratedMappedOperator(MappedOperator):
 super(DecoratedMappedOperator, 
DecoratedMappedOperator).__attrs_post_init__(self)
 XComArg.apply_upstream_relationship(self, 
self.op_kwargs_expand_input.value)
 
-def _expand_mapped_kwargs(self, context: Context, session: Session) -> 
tuple[Mapping[str, Any], set[int]]:
+def _expand_mapped_kwargs(
+self, context: Context, session: Session, *, include_xcom: bool
+) -> tuple[Mapping[str, Any], set[int]]:
 # We only use op_kwargs_expand_input so this must always be empty.
 if self.expand_input is not EXPAND_INPUT_EMPTY:
 raise AssertionError(f"unexpected expand_input: 
{self.expand_input}")
-op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context, 
session)
+op_kwargs, resolved_oids = super()._expand_mapped_kwargs(context, 
session, include_xcom=include_xcom)
 return {"op_kwargs": op_kwargs}, resolved_oids
 
 def _get_unmap_kwargs(self, mapped_kwargs: Mapping[str, Any], *, strict: 
bool) -> dict[str, Any]:
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 89e8b6cc72..9cf1830bb4 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -55,6 +55,7 @@ if TYPE_CHECKING:
 from airflow.models.operator import Operator
 from airflow.models.taskinstance import TaskInstance
 from airflow.task.priority_strategy import PriorityWeightStrategy
+from airflow.triggers.base import StartTriggerArgs
 from airflow.utils.task_group import TaskGroup
 
 DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
@@ -427,6 +428,27 @@ class AbstractOperator(Templater, DAGNode):
 """
 raise NotImplementedError()
 
+def expand_start_from_trigger(self, *, context: Context, session: Session) 
-> bool:
+"""
+Get the start_from_trigger value of the current abstract operator.
+
+MappedOperator uses this to unmap start_from_trigger to decide whether 
to start the task
+execution directly from triggerer.
+
+:meta private:
+"""
+raise NotImplementedError()
+
+def expand_start_trigger_args(self, *, context: Context, session: Session) 
-> StartTriggerArgs | None:
+"""
+Get the start_trigger_args value of the current abstract operator.
+
+MappedOperator uses this to unmap start_trigger_args to decide how to 
start a task from triggerer.
+
+:meta private:
+"""
+raise NotImplementedError()
+
 @property
 def priority_weight_total(self) -> int:
 """
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 30ab591867..8525b78f60 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1795,6 +1795,28 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 """

(airflow) branch main updated: Add DatasetAlias to support dynamic Dataset Event Emission and Dataset Creation (#40478)

2024-07-14 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 3805050f34 Add DatasetAlias to support dynamic Dataset Event Emission 
and Dataset Creation (#40478)
3805050f34 is described below

commit 3805050f34dcb575aaad690c6ad1e37f75f3b2cf
Author: Wei Lee 
AuthorDate: Mon Jul 15 08:56:33 2024 +0800

Add DatasetAlias to support dynamic Dataset Event Emission and Dataset 
Creation (#40478)

* feat(dataset_alias)
* add DatasetAlias class
* support yield dataset alias through datasets.Metadata
* allow only one dataset event to triggered for the same dataset with 
the same extra in a single task
* dynamically adding dataset through dataset_alias
* feat(datasets): add optional alias argument to dataset metadata
* feat(dag): add dataset aliases defined to db during dag parsing
* feat(datasets): register dataset change through dataset alias in outlet 
event
---
 airflow/datasets/__init__.py   |  34 -
 airflow/datasets/manager.py|   7 +-
 airflow/datasets/metadata.py   |  13 +-
 .../versions/0147_2_10_0_add_dataset_alias.py  |  59 +
 airflow/models/dag.py  |  52 +++-
 airflow/models/dataset.py  |  30 -
 airflow/models/taskinstance.py |  43 ++-
 airflow/serialization/enums.py |   1 +
 airflow/serialization/serialized_objects.py|  28 -
 airflow/utils/context.py   |  43 +--
 airflow/utils/context.pyi  |  15 ++-
 airflow/utils/db.py|   2 +-
 airflow/utils/operator_helpers.py  |   4 +
 .../authoring-and-scheduling/datasets.rst  |  58 +
 docs/apache-airflow/img/airflow_erd.sha256 |   2 +-
 docs/apache-airflow/img/airflow_erd.svg| 104 ---
 docs/apache-airflow/migrations-ref.rst |   4 +-
 tests/models/test_dag.py   |  55 +++-
 .../metadata.py => tests/models/test_dataset.py|  24 ++--
 tests/models/test_taskinstance.py  | 139 +
 tests/serialization/test_serialized_objects.py |  31 -
 tests/utils/test_context.py|  75 +++
 tests/utils/test_db_cleanup.py |   1 +
 23 files changed, 731 insertions(+), 93 deletions(-)

diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 3a85b824ab..7e26df496b 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -24,6 +24,8 @@ from typing import TYPE_CHECKING, Any, Callable, ClassVar, 
Iterable, Iterator
 
 import attr
 
+from airflow.typing_compat import TypedDict
+
 if TYPE_CHECKING:
 from urllib.parse import SplitResult
 
@@ -106,16 +108,20 @@ def _sanitize_uri(uri: str) -> str:
 return urllib.parse.urlunsplit(parsed)
 
 
-def coerce_to_uri(value: str | Dataset) -> str:
+def extract_event_key(value: str | Dataset | DatasetAlias) -> str:
 """
-Coerce a user input into a sanitized URI.
+Extract the key of an inlet or an outlet event.
 
 If the input value is a string, it is treated as a URI and sanitized. If 
the
 input is a :class:`Dataset`, the URI it contains is considered sanitized 
and
-returned directly.
+returned directly. If the input is a :class:`DatasetAlias`, the name it 
contains
+will be returned directly.
 
 :meta private:
 """
+if isinstance(value, DatasetAlias):
+return value.name
+
 if isinstance(value, Dataset):
 return value.uri
 return _sanitize_uri(str(value))
@@ -159,6 +165,28 @@ class BaseDataset:
 raise NotImplementedError
 
 
+@attr.define()
+class DatasetAlias(BaseDataset):
+"""A represeation of dataset alias which is used to create dataset during 
the runtime."""
+
+name: str
+
+def __eq__(self, other: Any) -> bool:
+if isinstance(other, DatasetAlias):
+return self.name == other.name
+return NotImplemented
+
+def __hash__(self) -> int:
+return hash(self.name)
+
+
+class DatasetAliasEvent(TypedDict):
+"""A represeation of dataset event to be triggered by a dataset alias."""
+
+source_alias_name: str
+dest_dataset_uri: str
+
+
 @attr.define()
 class Dataset(os.PathLike, BaseDataset):
 """A representation of data dependencies between workflows."""
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index d3e7a8cf84..2861b58171 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager

(airflow) branch main updated (43eaa60967 -> 07e6eb8e36)

2024-07-01 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 43eaa60967 test(weaviate): fix system tests (#40517)
 add 07e6eb8e36 Add notes about passing secrets via environment variables 
(#40519)

No new revisions were added by this update.

Summary of changes:
 docs/apache-airflow-providers-cncf-kubernetes/operators.rst   |  9 +
 .../apache-airflow/security/secrets/mask-sensitive-values.rst | 11 +++
 2 files changed, 20 insertions(+)



(airflow) branch main updated (a1e6e598ed -> 3133c33684)

2024-06-24 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from a1e6e598ed Revert "fix: scheduler crashing with OL provider on airflow 
standalone (#40353)" (#40402)
 add 3133c33684 docs: fix typo in upgrading.rst (#40399)

No new revisions were added by this update.

Summary of changes:
 docs/apache-airflow/installation/upgrading.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated: add delete index teardown to pinecone system tests (#40396)

2024-06-24 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 7904828f05 add delete index teardown to pinecone system tests (#40396)
7904828f05 is described below

commit 7904828f053cfb9a22275bfa35cf7f6a84e8a1eb
Author: vatsrahul1001 <43964496+vatsrahul1...@users.noreply.github.com>
AuthorDate: Mon Jun 24 15:30:58 2024 +0530

add delete index teardown to pinecone system tests (#40396)
---
 tests/system/providers/pinecone/example_create_pod_index.py | 13 -
 .../providers/pinecone/example_create_serverless_index.py   | 13 -
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/tests/system/providers/pinecone/example_create_pod_index.py 
b/tests/system/providers/pinecone/example_create_pod_index.py
index 9b6f7d7d88..a2d7f16c69 100644
--- a/tests/system/providers/pinecone/example_create_pod_index.py
+++ b/tests/system/providers/pinecone/example_create_pod_index.py
@@ -20,6 +20,7 @@ import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.decorators import task, teardown
 from airflow.providers.pinecone.operators.pinecone import 
CreatePodIndexOperator
 
 index_name = os.getenv("INDEX_NAME", "test")
@@ -33,7 +34,7 @@ with DAG(
 ) as dag:
 # [START howto_operator_create_pod_index]
 # reference: 
https://docs.pinecone.io/reference/api/control-plane/create_index
-CreatePodIndexOperator(
+create_index = CreatePodIndexOperator(
 task_id="pinecone_create_pod_index",
 index_name=index_name,
 dimension=3,
@@ -44,6 +45,16 @@ with DAG(
 )
 # [END howto_operator_create_pod_index]
 
+@teardown
+@task
+def delete_index():
+from airflow.providers.pinecone.hooks.pinecone import PineconeHook
+
+hook = PineconeHook()
+hook.delete_index(index_name=index_name)
+
+create_index >> delete_index()
+
 
 from tests.system.utils import get_test_run  # noqa: E402
 
diff --git a/tests/system/providers/pinecone/example_create_serverless_index.py 
b/tests/system/providers/pinecone/example_create_serverless_index.py
index a7924e63ef..cf1e2c5cee 100644
--- a/tests/system/providers/pinecone/example_create_serverless_index.py
+++ b/tests/system/providers/pinecone/example_create_serverless_index.py
@@ -20,6 +20,7 @@ import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.decorators import task, teardown
 from airflow.providers.pinecone.operators.pinecone import 
CreateServerlessIndexOperator
 
 index_name = os.getenv("INDEX_NAME", "test")
@@ -33,7 +34,7 @@ with DAG(
 ) as dag:
 # [START howto_operator_create_serverless_index]
 # reference: 
https://docs.pinecone.io/reference/api/control-plane/create_index
-CreateServerlessIndexOperator(
+create_index = CreateServerlessIndexOperator(
 task_id="pinecone_create_serverless_index",
 index_name=index_name,
 dimension=128,
@@ -43,6 +44,16 @@ with DAG(
 )
 # [END howto_operator_create_serverless_index]
 
+@teardown
+@task
+def delete_index():
+from airflow.providers.pinecone.hooks.pinecone import PineconeHook
+
+hook = PineconeHook()
+hook.delete_index(index_name=index_name)
+
+create_index >> delete_index()
+
 
 from tests.system.utils import get_test_run  # noqa: E402
 



(airflow) branch main updated: Introduce StartTriggerArgs and prevent start trigger initialization in scheduler (#39585)

2024-06-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d509abfa21 Introduce StartTriggerArgs and prevent start trigger 
initialization in scheduler (#39585)
d509abfa21 is described below

commit d509abfa217565d2d5249c639ea7459c44292368
Author: Wei Lee 
AuthorDate: Tue Jun 11 20:10:01 2024 +0900

Introduce StartTriggerArgs and prevent start trigger initialization in 
scheduler (#39585)

* fix(baseoperator): change `start_trigger` into `start_trigger_args`
* feat(baseoperator): add `start_from_trigger` as the flag to decide 
whether to start task execution from triggerer
* fix(dagrun): set start_date before deferring task from scheduler
---
 airflow/decorators/base.py |  4 +-
 airflow/models/abstractoperator.py |  5 +-
 airflow/models/baseoperator.py | 10 +--
 airflow/models/dagrun.py   | 16 +---
 airflow/models/mappedoperator.py   | 17 ++--
 airflow/models/taskinstance.py | 39 ++---
 airflow/serialization/serialized_objects.py| 31 +++
 airflow/triggers/base.py   | 20 +
 .../authoring-and-scheduling/deferring.rst | 36 +---
 tests/models/test_dagrun.py| 17 ++--
 tests/serialization/test_dag_serialization.py  | 99 +++---
 tests/serialization/test_pydantic_models.py|  4 +-
 12 files changed, 168 insertions(+), 130 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 2ae85a9c43..74b44ffe23 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -509,8 +509,8 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, 
FReturn, OperatorSubcla
 # task's expand() contribute to the op_kwargs operator argument, 
not
 # the operator arguments themselves, and should expand against it.
 expand_input_attr="op_kwargs_expand_input",
-start_trigger=self.operator_class.start_trigger,
-next_method=self.operator_class.next_method,
+start_trigger_args=self.operator_class.start_trigger_args,
+start_from_trigger=self.operator_class.start_from_trigger,
 )
 return XComArg(operator=operator)
 
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index b7160430e0..1bb83a2dc0 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -122,8 +122,9 @@ class AbstractOperator(Templater, DAGNode):
 "node_id",  # Duplicates task_id
 "task_group",  # Doesn't have a useful repr, no point showing in UI
 "inherits_from_empty_operator",  # impl detail
-"start_trigger",
-"next_method",
+# Decide whether to start task execution from triggerer
+"start_trigger_args",
+"start_from_trigger",
 # For compatibility with TG, for operators these are just the 
current task, no point showing
 "roots",
 "leaves",
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 98532d90b0..bbd629cfc1 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -116,7 +116,7 @@ if TYPE_CHECKING:
 from airflow.models.operator import Operator
 from airflow.models.xcom_arg import XComArg
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
-from airflow.triggers.base import BaseTrigger
+from airflow.triggers.base import BaseTrigger, StartTriggerArgs
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.types import ArgNotSet
 
@@ -819,8 +819,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 # Set to True for an operator instantiated by a mapped operator.
 __from_mapped = False
 
-start_trigger: BaseTrigger | None = None
-next_method: str | None = None
+start_trigger_args: StartTriggerArgs | None = None
+start_from_trigger: bool = False
 
 def __init__(
 self,
@@ -1679,9 +1679,9 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 "is_teardown",
 "on_failure_fail_dagrun",
 "map_index_template",
-"start_trigger",
-"next_method",
+"start_trigger_args",
 "_needs_expansion",
+"start_from_trigger",
 }
 )
 DagContext.pop_context_managed_dag()
dif

(airflow) branch main updated: Fix minor typo in dags.rst (#40169)

2024-06-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new e9e3c937f3 Fix minor typo in dags.rst (#40169)
e9e3c937f3 is described below

commit e9e3c937f31f07b9235ffc74ac70482c864800e9
Author: bangjiehan 
AuthorDate: Tue Jun 11 17:40:27 2024 +0800

Fix minor typo in dags.rst (#40169)
---
 docs/apache-airflow/core-concepts/dags.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/core-concepts/dags.rst 
b/docs/apache-airflow/core-concepts/dags.rst
index 5c989bab7a..482b604f33 100644
--- a/docs/apache-airflow/core-concepts/dags.rst
+++ b/docs/apache-airflow/core-concepts/dags.rst
@@ -816,7 +816,7 @@ doesn't support many advanced features, please check its
 
 With the ``glob`` syntax, the patterns work just like those in a 
``.gitignore`` file:
 
-* The ``*`` character will any number of characters, except ``/``
+* The ``*`` character will match any number of characters, except ``/``
 * The ``?`` character will match any single character, except ``/``
 * The range notation, e.g. ``[a-zA-Z]``, can be used to match one of the 
characters in a range
 * A pattern can be negated by prefixing with ``!``. Patterns are evaluated in 
order so



(airflow) branch main updated: Bump google-ads version to use v17 by default (#40158)

2024-06-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 8e7b48984b Bump google-ads version to use v17 by default (#40158)
8e7b48984b is described below

commit 8e7b48984b86dafba2bda298547c5d2bee44363a
Author: M. Olcay Tercanlı 
AuthorDate: Tue Jun 11 11:15:39 2024 +0200

Bump google-ads version to use v17 by default (#40158)
---
 airflow/providers/google/ads/hooks/ads.py | 8 
 airflow/providers/google/provider.yaml| 2 +-
 generated/provider_dependencies.json  | 2 +-
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/ads/hooks/ads.py 
b/airflow/providers/google/ads/hooks/ads.py
index 6d598529af..71448b44eb 100644
--- a/airflow/providers/google/ads/hooks/ads.py
+++ b/airflow/providers/google/ads/hooks/ads.py
@@ -32,9 +32,9 @@ from airflow.hooks.base import BaseHook
 from airflow.providers.google.common.hooks.base_google import get_field
 
 if TYPE_CHECKING:
-from google.ads.googleads.v16.services.services.customer_service import 
CustomerServiceClient
-from google.ads.googleads.v16.services.services.google_ads_service import 
GoogleAdsServiceClient
-from google.ads.googleads.v16.services.types.google_ads_service import 
GoogleAdsRow
+from google.ads.googleads.v17.services.services.customer_service import 
CustomerServiceClient
+from google.ads.googleads.v17.services.services.google_ads_service import 
GoogleAdsServiceClient
+from google.ads.googleads.v17.services.types.google_ads_service import 
GoogleAdsRow
 from google.api_core.page_iterator import GRPCIterator
 
 
@@ -100,7 +100,7 @@ class GoogleAdsHook(BaseHook):
 :param api_version: The Google Ads API version to use.
 """
 
-default_api_version = "v16"
+default_api_version = "v17"
 
 def __init__(
 self,
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 2f3af45277..9435595068 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -104,7 +104,7 @@ dependencies:
   - gcloud-aio-bigquery>=6.1.2
   - gcloud-aio-storage>=9.0.0
   - gcsfs>=2023.10.0
-  - google-ads>=23.1.0
+  - google-ads>=24.1.0
   - google-analytics-admin>=0.9.0
   # Google-api-core 2.16.0 back-compat issue:
   # - https://github.com/googleapis/python-api-core/issues/576
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 0f9b6d03c7..8c97f10641 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -587,7 +587,7 @@
   "gcloud-aio-bigquery>=6.1.2",
   "gcloud-aio-storage>=9.0.0",
   "gcsfs>=2023.10.0",
-  "google-ads>=23.1.0",
+  "google-ads>=24.1.0",
   "google-analytics-admin>=0.9.0",
   "google-api-core>=2.11.0,!=2.16.0,!=2.18.0",
   "google-api-python-client>=2.0.2",



(airflow) branch main updated: Add encryption_configuration parameter to BigQuery operators (#40063)

2024-06-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 5c5a495c2c Add encryption_configuration parameter to BigQuery 
operators (#40063)
5c5a495c2c is described below

commit 5c5a495c2cad9faa703eb8ccde47b368fb3eea9a
Author: VladaZakharova <80038284+vladazakhar...@users.noreply.github.com>
AuthorDate: Tue Jun 11 11:06:27 2024 +0200

Add encryption_configuration parameter to BigQuery operators (#40063)
---
 .../providers/google/cloud/operators/bigquery.py   |  86 ---
 .../google/cloud/operators/test_bigquery.py| 164 +
 2 files changed, 233 insertions(+), 17 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 846f884e77..fbce7ca768 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -150,7 +150,12 @@ class _BigQueryOperatorsEncryptionConfigurationMixin:
 # annotation of the `self`. Then you can inherit this class in the target 
operator.
 # e.g: BigQueryCheckOperator, BigQueryTableCheckOperator
 def include_encryption_configuration(  # type:ignore[misc]
-self: BigQueryCheckOperator | BigQueryTableCheckOperator,
+self: BigQueryCheckOperator
+| BigQueryTableCheckOperator
+| BigQueryValueCheckOperator
+| BigQueryColumnCheckOperator
+| BigQueryGetDataOperator
+| BigQueryIntervalCheckOperator,
 configuration: dict,
 config_key: str,
 ) -> None:
@@ -206,7 +211,7 @@ class BigQueryCheckOperator(
 Token Creator IAM role to the directly preceding identity, with first
 account from the list granting this role to the originating account. 
(templated)
 :param labels: a dictionary containing labels for the table, passed to 
BigQuery.
-:param encryption_configuration: [Optional] Custom encryption 
configuration (e.g., Cloud KMS keys).
+:param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
 
 .. code-block:: python
 
@@ -327,7 +332,9 @@ class BigQueryCheckOperator(
 self.log.info("Success.")
 
 
-class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
+class BigQueryValueCheckOperator(
+_BigQueryDbHookMixin, SQLValueCheckOperator, 
_BigQueryOperatorsEncryptionConfigurationMixin
+):
 """Perform a simple value check using sql code.
 
 .. seealso::
@@ -337,6 +344,13 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
 :param sql: SQL to execute.
 :param use_legacy_sql: Whether to use legacy SQL (true)
 or standard SQL (false).
+:param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
+
+.. code-block:: python
+
+encryption_configuration = {
+"kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+}
 :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
 :param location: The geographic location of the job. See details at:
 
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -371,6 +385,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
 sql: str,
 pass_value: Any,
 tolerance: Any = None,
+encryption_configuration: dict | None = None,
 gcp_conn_id: str = "google_cloud_default",
 use_legacy_sql: bool = True,
 location: str | None = None,
@@ -384,6 +399,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
 self.location = location
 self.gcp_conn_id = gcp_conn_id
 self.use_legacy_sql = use_legacy_sql
+self.encryption_configuration = encryption_configuration
 self.impersonation_chain = impersonation_chain
 self.labels = labels
 self.deferrable = deferrable
@@ -402,6 +418,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
 },
 }
 
+self.include_encryption_configuration(configuration, "query")
+
 return hook.insert_job(
 configuration=configuration,
 project_id=hook.project_id,
@@ -461,7 +479,9 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
 )
 
 
-class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperator):
+class BigQueryIntervalCheckOperator(
+_BigQueryDbHookMixin, SQLIntervalCheckOperator, 
_BigQueryOperatorsEncryptionConfigurationMixin
+):
 """
 Check

(airflow) branch main updated: fix wrong arguments in read_namespaced_pod_log call (#39874)

2024-05-27 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new e190cff272 fix wrong arguments in read_namespaced_pod_log call (#39874)
e190cff272 is described below

commit e190cff27299256df75b56e46e27e9932174805a
Author: Wei Lee 
AuthorDate: Mon May 27 21:10:58 2024 -0400

fix wrong arguments in read_namespaced_pod_log call (#39874)

* fix(providers/cncf)
* remove unexpected argument pod in read_namespaced_pod_log call
* change wrong argument container_name to container for 
read_namespaced_pod_log
---
 airflow/providers/cncf/kubernetes/operators/pod.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 0cb08033b4..d530f86bcc 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -807,8 +807,7 @@ class KubernetesPodOperator(BaseOperator):
 logs = self.client.read_namespaced_pod_log(
 name=pod.metadata.name,
 namespace=pod.metadata.namespace,
-pod=pod,
-container_name=self.base_container_name,
+container=self.base_container_name,
 follow=follow,
 timestamps=False,
 since_seconds=since_seconds,



(airflow) branch main updated: Switch AzureDataLakeStorageV2Hook to use DefaultAzureCredential for managed identity/workload auth (#38497)

2024-05-26 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d5f81a4e2d Switch AzureDataLakeStorageV2Hook to use 
DefaultAzureCredential for managed identity/workload auth (#38497)
d5f81a4e2d is described below

commit d5f81a4e2de0d4236cffcf2e2d3c682b4c6ec355
Author: Tamara Janina Fingerlin <90063506+tja...@users.noreply.github.com>
AuthorDate: Mon May 27 02:28:39 2024 +0200

Switch AzureDataLakeStorageV2Hook to use DefaultAzureCredential for managed 
identity/workload auth (#38497)
---
 .../providers/microsoft/azure/hooks/data_lake.py   |  7 +++---
 .../microsoft/azure/hooks/test_data_factory.py | 29 +-
 2 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py 
b/airflow/providers/microsoft/azure/hooks/data_lake.py
index 054eda087e..b2d9c5aafa 100644
--- a/airflow/providers/microsoft/azure/hooks/data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/data_lake.py
@@ -22,7 +22,7 @@ from typing import Any, Union
 
 from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
 from azure.datalake.store import core, lib, multithread
-from azure.identity import ClientSecretCredential
+from azure.identity import ClientSecretCredential, DefaultAzureCredential
 from azure.storage.filedatalake import (
 DataLakeDirectoryClient,
 DataLakeFileClient,
@@ -38,9 +38,10 @@ from airflow.providers.microsoft.azure.utils import (
 AzureIdentityCredentialAdapter,
 add_managed_identity_connection_widgets,
 get_field,
+get_sync_default_azure_credential,
 )
 
-Credentials = Union[ClientSecretCredential, AzureIdentityCredentialAdapter]
+Credentials = Union[ClientSecretCredential, AzureIdentityCredentialAdapter, 
DefaultAzureCredential]
 
 
 class AzureDataLakeHook(BaseHook):
@@ -358,7 +359,7 @@ class AzureDataLakeStorageV2Hook(BaseHook):
 else:
 managed_identity_client_id = self._get_field(extra, 
"managed_identity_client_id")
 workload_identity_tenant_id = self._get_field(extra, 
"workload_identity_tenant_id")
-credential = AzureIdentityCredentialAdapter(
+credential = get_sync_default_azure_credential(
 managed_identity_client_id=managed_identity_client_id,
 workload_identity_tenant_id=workload_identity_tenant_id,
 )
diff --git a/tests/providers/microsoft/azure/hooks/test_data_factory.py 
b/tests/providers/microsoft/azure/hooks/test_data_factory.py
index 1ee77ad3af..a7d8786fd8 100644
--- a/tests/providers/microsoft/azure/hooks/test_data_factory.py
+++ b/tests/providers/microsoft/azure/hooks/test_data_factory.py
@@ -86,8 +86,8 @@ def setup_connections(create_mock_connections):
 "factory_name": DEFAULT_FACTORY,
 },
 ),
+# connection_missing_subscription_id
 Connection(
-# connection_missing_subscription_id
 conn_id="azure_data_factory_missing_subscription_id",
 conn_type="azure_data_factory",
 login="clientId",
@@ -110,6 +110,18 @@ def setup_connections(create_mock_connections):
 "factory_name": DEFAULT_FACTORY,
 },
 ),
+# connection_workload_identity
+Connection(
+conn_id="azure_data_factory_workload_identity",
+conn_type="azure_data_factory",
+extra={
+"subscriptionId": "subscriptionId",
+"resource_group_name": DEFAULT_RESOURCE_GROUP,
+"factory_name": DEFAULT_FACTORY,
+"workload_identity_tenant_id": "workload_tenant_id",
+"managed_identity_client_id": "workload_client_id",
+},
+),
 )
 
 
@@ -198,6 +210,21 @@ def 
test_get_conn_by_default_azure_credential(mock_credential):
 mock_create_client.assert_called_with(mock_credential(), 
"subscriptionId")
 
 
+@mock.patch(f"{MODULE}.get_sync_default_azure_credential")
+def test_get_conn_with_workload_identity(mock_credential):
+hook = AzureDataFactoryHook("azure_data_factory_workload_identity")
+with patch.object(hook, "_create_client") as mock_create_client:
+mock_create_client.return_value = MagicMock()
+
+connection = hook.get_conn()
+assert connection is not None
+mock_credential.assert_called_once_with(
+managed_identity_client_id="workload_client_id",
+workload_identity_tenant_id="workload_tenant_id",
+)
+mock_create_client.assert_called_with(mock_credential(), 
"subscriptionId")
+
+
 def test_get_factory(hook: AzureDataFactoryHook):
 hook.get_factory(RESOURCE_GROUP, FACTORY)
 



(airflow) branch main updated: Fix deferrable mode for BeamRunJavaPipelineOperator (#39371)

2024-05-14 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 1489cf7a03 Fix deferrable mode for BeamRunJavaPipelineOperator (#39371)
1489cf7a03 is described below

commit 1489cf7a0372898ab5f905fa7b56f3b1327d2cfe
Author: Maksim 
AuthorDate: Tue May 14 07:53:13 2024 -0700

Fix deferrable mode for BeamRunJavaPipelineOperator (#39371)
---
 airflow/providers/apache/beam/operators/beam.py| 21 +++--
 airflow/providers/apache/beam/triggers/beam.py | 22 --
 tests/providers/apache/beam/operators/test_beam.py | 10 ++
 tests/providers/apache/beam/triggers/test_beam.py  | 13 +
 4 files changed, 38 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/apache/beam/operators/beam.py 
b/airflow/providers/apache/beam/operators/beam.py
index 62f650f19a..af338cdc6d 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -546,7 +546,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
 if not self.beam_hook:
 raise AirflowException("Beam hook is not defined.")
 if self.deferrable:
-asyncio.run(self.execute_async(context))
+self.execute_async(context)
 else:
 return self.execute_sync(context)
 
@@ -605,23 +605,7 @@ class 
BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
 process_line_callback=self.process_line_callback,
 )
 
-async def execute_async(self, context: Context):
-# Creating a new event loop to manage I/O operations asynchronously
-loop = asyncio.get_event_loop()
-if self.jar.lower().startswith("gs://"):
-gcs_hook = GCSHook(self.gcp_conn_id)
-# Running synchronous `enter_context()` method in a separate
-# thread using the default executor `None`. The 
`run_in_executor()` function returns the
-# file object, which is created using gcs function 
`provide_file()`, asynchronously.
-# This means we can perform asynchronous operations with this file.
-create_tmp_file_call = gcs_hook.provide_file(object_url=self.jar)
-tmp_gcs_file: IO[str] = await loop.run_in_executor(
-None,
-contextlib.ExitStack().enter_context,  # type: ignore[arg-type]
-create_tmp_file_call,
-)
-self.jar = tmp_gcs_file.name
-
+def execute_async(self, context: Context):
 if self.is_dataflow and self.dataflow_hook:
 DataflowJobLink.persist(
 self,
@@ -657,6 +641,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
 job_class=self.job_class,
 runner=self.runner,
 check_if_running=self.dataflow_config.check_if_running == 
CheckJobRunning.WaitForRun,
+gcp_conn_id=self.gcp_conn_id,
 ),
 method_name="execute_complete",
 )
diff --git a/airflow/providers/apache/beam/triggers/beam.py 
b/airflow/providers/apache/beam/triggers/beam.py
index 5b1f7a99d5..b160218f73 100644
--- a/airflow/providers/apache/beam/triggers/beam.py
+++ b/airflow/providers/apache/beam/triggers/beam.py
@@ -17,7 +17,8 @@
 from __future__ import annotations
 
 import asyncio
-from typing import Any, AsyncIterator, Sequence
+import contextlib
+from typing import IO, Any, AsyncIterator, Sequence
 
 from deprecated import deprecated
 from google.cloud.dataflow_v1beta3 import ListJobsRequest
@@ -25,6 +26,7 @@ from google.cloud.dataflow_v1beta3 import ListJobsRequest
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook
 from airflow.providers.google.cloud.hooks.dataflow import AsyncDataflowHook
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 
@@ -166,7 +168,7 @@ class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
 project_id: str | None = None,
 location: str | None = None,
 job_name: str | None = None,
-gcp_conn_id: str | None = None,
+gcp_conn_id: str = "google_cloud_default",
 impersonation_chain: str | Sequence[str] | None = None,
 poll_sleep: int = 10,
 cancel_timeout: int | None = None,
@@ -233,6 +235,22 @@ class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
 if is_running:
 await asyncio.sleep(self.poll_sleep)
 try:
+# Get the current running event loop to manage I/O operations 
asynchronously
+loop = asyncio.get_running_loop()
+  

(airflow) branch main updated: remove extra return (#39582)

2024-05-13 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new dc61da2509 remove extra return (#39582)
dc61da2509 is described below

commit dc61da2509536bb509a4a2e1781ff47aa36b62b4
Author: Kalyan 
AuthorDate: Mon May 13 16:25:41 2024 +0530

remove extra return (#39582)
---
 airflow/providers/microsoft/mssql/hooks/mssql.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/airflow/providers/microsoft/mssql/hooks/mssql.py 
b/airflow/providers/microsoft/mssql/hooks/mssql.py
index d4deb60948..a7a3523704 100644
--- a/airflow/providers/microsoft/mssql/hooks/mssql.py
+++ b/airflow/providers/microsoft/mssql/hooks/mssql.py
@@ -114,7 +114,6 @@ class MsSqlHook(DbApiHook):
 database=self.schema or conn.schema,
 port=conn.port,
 )
-return conn
 
 def set_autocommit(
 self,



(airflow) branch main updated: Fix logic to cancel the external job if the TaskInstance is not in a running or deferred state for DataprocSubmitJobOperator (#39447)

2024-05-08 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 387acd0a362 Fix logic to cancel the external job if the TaskInstance 
is not in a running or deferred state for DataprocSubmitJobOperator (#39447)
387acd0a362 is described below

commit 387acd0a362899347f9444a29688794c86778c3e
Author: Ankit Chaurasia <8670962+sunank...@users.noreply.github.com>
AuthorDate: Wed May 8 14:40:42 2024 +0545

Fix logic to cancel the external job if the TaskInstance is not in a 
running or deferred state for DataprocSubmitJobOperator (#39447)
---
 .../providers/google/cloud/triggers/dataproc.py| 41 +-
 .../google/cloud/triggers/test_dataproc.py |  8 +++--
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/google/cloud/triggers/dataproc.py 
b/airflow/providers/google/cloud/triggers/dataproc.py
index 939e5bbcac7..99800d266a8 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -116,6 +116,41 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
 },
 )
 
+@provide_session
+def get_task_instance(self, session: Session) -> TaskInstance:
+"""
+Get the task instance for the current task.
+
+:param session: Sqlalchemy session
+"""
+query = session.query(TaskInstance).filter(
+TaskInstance.dag_id == self.task_instance.dag_id,
+TaskInstance.task_id == self.task_instance.task_id,
+TaskInstance.run_id == self.task_instance.run_id,
+TaskInstance.map_index == self.task_instance.map_index,
+)
+task_instance = query.one_or_none()
+if task_instance is None:
+raise AirflowException(
+"TaskInstance with dag_id: %s,task_id: %s, run_id: %s and 
map_index: %s is not found",
+self.task_instance.dag_id,
+self.task_instance.task_id,
+self.task_instance.run_id,
+self.task_instance.map_index,
+)
+return task_instance
+
+def safe_to_cancel(self) -> bool:
+"""
+Whether it is safe to cancel the external job which is being executed 
by this trigger.
+
+This is to avoid the case that `asyncio.CancelledError` is called 
because the trigger itself is stopped.
+Because in those cases, we should NOT cancel the external job.
+"""
+# Database query is needed to get the latest state of the task 
instance.
+task_instance = self.get_task_instance()  # type: ignore[call-arg]
+return task_instance.state != TaskInstanceState.DEFERRED
+
 async def run(self):
 try:
 while True:
@@ -131,7 +166,11 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
 except asyncio.CancelledError:
 self.log.info("Task got cancelled.")
 try:
-if self.job_id and self.cancel_on_kill:
+if self.job_id and self.cancel_on_kill and 
self.safe_to_cancel():
+self.log.info(
+"Cancelling the job as it is safe to do so. Note that 
the airflow TaskInstance is not"
+" in deferred state."
+)
 self.log.info("Cancelling the job: %s", self.job_id)
 # The synchronous hook is utilized to delete the cluster 
when a task is cancelled. This
 # is because the asynchronous hook deletion is not awaited 
when the trigger task is
diff --git a/tests/providers/google/cloud/triggers/test_dataproc.py 
b/tests/providers/google/cloud/triggers/test_dataproc.py
index 08294a5ac59..39ed949463c 100644
--- a/tests/providers/google/cloud/triggers/test_dataproc.py
+++ b/tests/providers/google/cloud/triggers/test_dataproc.py
@@ -124,6 +124,7 @@ def submit_trigger():
 region=TEST_REGION,
 gcp_conn_id=TEST_GCP_CONN_ID,
 polling_interval_seconds=TEST_POLL_INTERVAL,
+cancel_on_kill=True,
 )
 
 
@@ -569,12 +570,15 @@ class TestDataprocSubmitTrigger:
 assert event.payload == expected_event.payload
 
 @pytest.mark.asyncio
+@pytest.mark.parametrize("is_safe_to_cancel", [True, False])
 
@mock.patch("airflow.providers.google.cloud.triggers.dataproc.DataprocSubmitTrigger.get_async_hook")
 
@mock.patch("airflow.providers.google.cloud.triggers.dataproc.DataprocSubmitTrigger.get_sync_hook")
+
@mock.patch("airflow.providers.google.cloud.triggers.dataproc.DataprocSubmitTrigger.safe_to_cancel")
 async def test_submit_trigger_run_cancelled(
-

(airflow) branch main updated: Fix logic to cancel the external job if the TaskInstance is not in a running or deferred state for BigQueryInsertJobOperator (#39442)

2024-05-08 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new e7aa4d2289 Fix logic to cancel the external job if the TaskInstance is 
not in a running or deferred state for BigQueryInsertJobOperator (#39442)
e7aa4d2289 is described below

commit e7aa4d2289cd4207f11b697729466717889fda38
Author: Ankit Chaurasia <8670962+sunank...@users.noreply.github.com>
AuthorDate: Wed May 8 14:40:29 2024 +0545

Fix logic to cancel the external job if the TaskInstance is not in a 
running or deferred state for BigQueryInsertJobOperator (#39442)
---
 .../providers/google/cloud/triggers/bigquery.py| 60 --
 .../google/cloud/triggers/test_bigquery.py | 40 ++-
 2 files changed, 95 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/google/cloud/triggers/bigquery.py 
b/airflow/providers/google/cloud/triggers/bigquery.py
index e2e0e82f6b..fc19db9881 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -17,13 +17,20 @@
 from __future__ import annotations
 
 import asyncio
-from typing import Any, AsyncIterator, Sequence, SupportsAbs
+from typing import TYPE_CHECKING, Any, AsyncIterator, Sequence, SupportsAbs
 
 from aiohttp import ClientSession
 from aiohttp.client_exceptions import ClientResponseError
 
+from airflow.exceptions import AirflowException
+from airflow.models.taskinstance import TaskInstance
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryAsyncHook, 
BigQueryTableAsyncHook
 from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.utils.session import provide_session
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+from sqlalchemy.orm.session import Session
 
 
 class BigQueryInsertJobTrigger(BaseTrigger):
@@ -89,6 +96,36 @@ class BigQueryInsertJobTrigger(BaseTrigger):
 },
 )
 
+@provide_session
+def get_task_instance(self, session: Session) -> TaskInstance:
+query = session.query(TaskInstance).filter(
+TaskInstance.dag_id == self.task_instance.dag_id,
+TaskInstance.task_id == self.task_instance.task_id,
+TaskInstance.run_id == self.task_instance.run_id,
+TaskInstance.map_index == self.task_instance.map_index,
+)
+task_instance = query.one_or_none()
+if task_instance is None:
+raise AirflowException(
+"TaskInstance with dag_id: %s, task_id: %s, run_id: %s and 
map_index: %s is not found",
+self.task_instance.dag_id,
+self.task_instance.task_id,
+self.task_instance.run_id,
+self.task_instance.map_index,
+)
+return task_instance
+
+def safe_to_cancel(self) -> bool:
+"""
+Whether it is safe to cancel the external job which is being executed 
by this trigger.
+
+This is to avoid the case that `asyncio.CancelledError` is called 
because the trigger itself is stopped.
+Because in those cases, we should NOT cancel the external job.
+"""
+# Database query is needed to get the latest state of the task 
instance.
+task_instance = self.get_task_instance()  # type: ignore[call-arg]
+return task_instance.state != TaskInstanceState.DEFERRED
+
 async def run(self) -> AsyncIterator[TriggerEvent]:  # type: 
ignore[override]
 """Get current job execution status and yields a TriggerEvent."""
 hook = self._get_async_hook()
@@ -117,13 +154,27 @@ class BigQueryInsertJobTrigger(BaseTrigger):
 )
 await asyncio.sleep(self.poll_interval)
 except asyncio.CancelledError:
-self.log.info("Task was killed.")
-if self.job_id and self.cancel_on_kill:
+if self.job_id and self.cancel_on_kill and self.safe_to_cancel():
+self.log.info(
+"The job is safe to cancel the as airflow TaskInstance is 
not in deferred state."
+)
+self.log.info(
+"Cancelling job. Project ID: %s, Location: %s, Job ID: %s",
+self.project_id,
+self.location,
+self.job_id,
+)
 await hook.cancel_job(  # type: ignore[union-attr]
 job_id=self.job_id, project_id=self.project_id, 
location=self.location
 )
 else:
-self.log.info("Skipping to cancel job: %s:%s.%s", 
self.project_id, self.location, self.job_id)
+self.log.info(
+"

(airflow) branch main updated: Fix logic to cancel the external job if the TaskInstance is not in a running or deferred state for DataprocCreateClusterOperator (#39446)

2024-05-08 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 3d575fe78c Fix logic to cancel the external job if the TaskInstance is 
not in a running or deferred state for DataprocCreateClusterOperator (#39446)
3d575fe78c is described below

commit 3d575fe78c5fef6796e36901865c45be2b89cbd1
Author: Ankit Chaurasia <8670962+sunank...@users.noreply.github.com>
AuthorDate: Wed May 8 13:27:13 2024 +0545

Fix logic to cancel the external job if the TaskInstance is not in a 
running or deferred state for DataprocCreateClusterOperator (#39446)
---
 .../providers/google/cloud/triggers/dataproc.py| 44 +-
 .../google/cloud/triggers/test_dataproc.py | 35 -
 2 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/google/cloud/triggers/dataproc.py 
b/airflow/providers/google/cloud/triggers/dataproc.py
index 427bf8a096..939e5bbcac 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -22,16 +22,22 @@ from __future__ import annotations
 import asyncio
 import re
 import time
-from typing import Any, AsyncIterator, Sequence
+from typing import TYPE_CHECKING, Any, AsyncIterator, Sequence
 
 from google.api_core.exceptions import NotFound
 from google.cloud.dataproc_v1 import Batch, Cluster, ClusterStatus, JobStatus
 
 from airflow.exceptions import AirflowException
+from airflow.models.taskinstance import TaskInstance
 from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook, 
DataprocHook
 from airflow.providers.google.cloud.utils.dataproc import DataprocOperationType
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
 from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.utils.session import provide_session
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+from sqlalchemy.orm.session import Session
 
 
 class DataprocBaseTrigger(BaseTrigger):
@@ -178,6 +184,36 @@ class DataprocClusterTrigger(DataprocBaseTrigger):
 },
 )
 
+@provide_session
+def get_task_instance(self, session: Session) -> TaskInstance:
+query = session.query(TaskInstance).filter(
+TaskInstance.dag_id == self.task_instance.dag_id,
+TaskInstance.task_id == self.task_instance.task_id,
+TaskInstance.run_id == self.task_instance.run_id,
+TaskInstance.map_index == self.task_instance.map_index,
+)
+task_instance = query.one_or_none()
+if task_instance is None:
+raise AirflowException(
+"TaskInstance with dag_id: %s,task_id: %s, run_id: %s and 
map_index: %s is not found.",
+self.task_instance.dag_id,
+self.task_instance.task_id,
+self.task_instance.run_id,
+self.task_instance.map_index,
+)
+return task_instance
+
+def safe_to_cancel(self) -> bool:
+"""
+Whether it is safe to cancel the external job which is being executed 
by this trigger.
+
+This is to avoid the case that `asyncio.CancelledError` is called 
because the trigger itself is stopped.
+Because in those cases, we should NOT cancel the external job.
+"""
+# Database query is needed to get the latest state of the task 
instance.
+task_instance = self.get_task_instance()  # type: ignore[call-arg]
+return task_instance.state != TaskInstanceState.DEFERRED
+
 async def run(self) -> AsyncIterator[TriggerEvent]:
 try:
 while True:
@@ -207,7 +243,11 @@ class DataprocClusterTrigger(DataprocBaseTrigger):
 await asyncio.sleep(self.polling_interval_seconds)
 except asyncio.CancelledError:
 try:
-if self.delete_on_error:
+if self.delete_on_error and self.safe_to_cancel():
+self.log.info(
+"Deleting the cluster as it is safe to delete as the 
airflow TaskInstance is not in "
+"deferred state."
+)
 self.log.info("Deleting cluster %s.", self.cluster_name)
 # The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
 # This is because the asynchronous hook deletion is not 
awaited when the trigger task
diff --git a/tests/providers/google/cloud/triggers/test_dataproc.py 
b/tests/providers/google/cloud/triggers/test_dataproc.py
index f41fc3a280..08294a5ac5 100644
--- a/tests/providers/google/cloud/triggers/test_dataproc.py
+++ b/tests/providers/google/cloud/triggers/te

(airflow) branch main updated (4a568d308a -> 9eac30567d)

2024-05-06 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 4a568d308a Fix typo sensitive masking words in docs (#39415)
 add 9eac30567d Updating S3LogLink with an invalid bucket link (#39424)

No new revisions were added by this update.

Summary of changes:
 docs/apache-airflow/howto/define-extra-link.rst | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)



(airflow) branch main updated: get all failed tasks errors in when exception raised in DatabricksCreateJobsOperator (#39354)

2024-05-03 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 2d103e115c get all failed tasks errors in when exception raised in 
DatabricksCreateJobsOperator (#39354)
2d103e115c is described below

commit 2d103e115c9951ce2bccb6b7ffa4fbd7ff269ef3
Author: gaurav7261 <142777151+gaurav7...@users.noreply.github.com>
AuthorDate: Fri May 3 14:14:45 2024 +0530

get all failed tasks errors in when exception raised in 
DatabricksCreateJobsOperator (#39354)
---
 .../providers/databricks/operators/databricks.py   | 21 +++
 .../databricks/operators/test_databricks.py| 73 +-
 2 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py 
b/airflow/providers/databricks/operators/databricks.py
index c38b0683c3..0d819e1b70 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -67,23 +67,22 @@ def _handle_databricks_operator_execution(operator, hook, 
log, context) -> None:
 log.info("%s completed successfully.", operator.task_id)
 log.info("View run status, Spark UI, and logs at %s", 
run_page_url)
 return
-
 if run_state.result_state == "FAILED":
-task_run_id = None
+failed_tasks = []
 for task in run_info.get("tasks", []):
 if task.get("state", {}).get("result_state", "") == 
"FAILED":
 task_run_id = task["run_id"]
-if task_run_id is not None:
-run_output = hook.get_run_output(task_run_id)
-if "error" in run_output:
-notebook_error = run_output["error"]
-else:
-notebook_error = run_state.state_message
-else:
-notebook_error = run_state.state_message
+task_key = task["task_key"]
+run_output = hook.get_run_output(task_run_id)
+if "error" in run_output:
+error = run_output["error"]
+else:
+error = run_state.state_message
+failed_tasks.append({"task_key": task_key, 
"run_id": task_run_id, "error": error})
+
 error_message = (
 f"{operator.task_id} failed with terminal state: 
{run_state} "
-f"and with the error {notebook_error}"
+f"and with the errors {failed_tasks}"
 )
 else:
 error_message = (
diff --git a/tests/providers/databricks/operators/test_databricks.py 
b/tests/providers/databricks/operators/test_databricks.py
index e6cb240dfc..64b9ba985c 100644
--- a/tests/providers/databricks/operators/test_databricks.py
+++ b/tests/providers/databricks/operators/test_databricks.py
@@ -1310,6 +1310,7 @@ class TestDatabricksRunNowOperator:
 "tasks": [
 {
 "run_id": 2,
+"task_key": "first_task",
 "state": {
 "life_cycle_state": "TERMINATED",
 "result_state": "FAILED",
@@ -1321,10 +1322,76 @@ class TestDatabricksRunNowOperator:
 )
 db_mock.get_run_output = mock_dict({"error": "Exception: Something 
went wrong..."})
 
-with pytest.raises(AirflowException) as exc_info:
+with pytest.raises(AirflowException, match="Exception: Something went 
wrong"):
 op.execute(None)
 
-assert exc_info.value.args[0].endswith(" Exception: Something went 
wrong...")
+expected = utils.normalise_json_content(
+{
+"notebook_params": NOTEBOOK_PARAMS,
+"notebook_task": NOTEBOOK_TASK,
+"jar_params": JAR_PARAMS,
+"job_id": JOB_ID,
+}
+)
+db_mock_class.assert_called_once_with(
+DEFAULT_CONN_ID,
+retry_limit=op.databricks_retry_limit,
+retry_delay=op.databricks_retry_delay,
+retry_args=None,
+caller="DatabricksRunNowOperator"

(airflow) branch main updated (0a98435760 -> 61d1c95278)

2024-05-03 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 0a98435760 Resolve deprecations in Python sensors/operators/decorators 
(#39241)
 add 61d1c95278 Fix TaskHandlerWithCustomFormatter now adds prefix only 
once (#38502)

No new revisions were added by this update.

Summary of changes:
 airflow/config_templates/config.yml|  2 +-
 .../log/task_handler_with_custom_formatter.py  |  3 ++-
 .../test_task_handler_with_custom_formatter.py | 24 ++
 3 files changed, 23 insertions(+), 6 deletions(-)



(airflow) branch main updated (42dbccaac2 -> da6c2bcf3c)

2024-05-01 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 42dbccaac2 [FEAT] raise exception with main notebook error in 
DatabricksRunNowDeferrableOperator (#39110)
 add da6c2bcf3c OpenAI Files & Vector Store Hooks (#39248)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/openai/hooks/openai.py| 178 +++
 airflow/providers/openai/provider.yaml  |   2 +-
 generated/provider_dependencies.json|   2 +-
 tests/providers/openai/hooks/test_openai.py | 214 ++--
 4 files changed, 361 insertions(+), 35 deletions(-)



(airflow) branch main updated (97871a0378 -> 42dbccaac2)

2024-05-01 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 97871a0378 Fix SparkKubernetesOperator when using initContainers 
(#38119)
 add 42dbccaac2 [FEAT] raise exception with main notebook error in 
DatabricksRunNowDeferrableOperator (#39110)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/databricks/hooks/databricks.py   | 11 +++
 .../providers/databricks/operators/databricks.py   | 11 +--
 .../providers/databricks/triggers/databricks.py| 45 ++
 airflow/providers/databricks/utils/databricks.py   |  2 +-
 .../providers/databricks/hooks/test_databricks.py  | 17 
 .../databricks/operators/test_databricks.py|  5 ++
 .../databricks/triggers/test_databricks.py | 99 +-
 .../providers/databricks/utils/test_databricks.py  |  1 +
 8 files changed, 168 insertions(+), 23 deletions(-)



(airflow) branch main updated (778e8c50b9 -> d4bdffc45c)

2024-04-30 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 778e8c50b9 Pinecone provider support for `pinecone-client`>=3  (#37307)
 add d4bdffc45c migrate to dbt v3 api for project endpoints (#39214)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/dbt/cloud/hooks/dbt.py| 25 +++---
 tests/providers/dbt/cloud/hooks/test_dbt.py | 51 +
 2 files changed, 44 insertions(+), 32 deletions(-)



(airflow) branch main updated: Pinecone provider support for `pinecone-client`>=3 (#37307)

2024-04-30 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 778e8c50b9 Pinecone provider support for `pinecone-client`>=3  (#37307)
778e8c50b9 is described below

commit 778e8c50b987176b15689bb681ac4c48d7a7805a
Author: Kalyan 
AuthorDate: Tue Apr 30 16:20:19 2024 +0530

Pinecone provider support for `pinecone-client`>=3  (#37307)
---
 airflow/providers/pinecone/CHANGELOG.rst   |  30 +++
 airflow/providers/pinecone/hooks/pinecone.py   | 221 +
 airflow/providers/pinecone/operators/pinecone.py   | 130 
 airflow/providers/pinecone/provider.yaml   |   5 +-
 .../connections.rst|  12 +-
 docs/apache-airflow-providers-pinecone/index.rst   |   4 +-
 .../operators/pinecone.rst |  52 -
 generated/provider_dependencies.json   |   2 +-
 tests/providers/pinecone/hooks/test_pinecone.py|  44 +++-
 .../providers/pinecone/example_create_pod_index.py |  51 +
 .../pinecone/example_create_serverless_index.py|  50 +
 11 files changed, 496 insertions(+), 105 deletions(-)

diff --git a/airflow/providers/pinecone/CHANGELOG.rst 
b/airflow/providers/pinecone/CHANGELOG.rst
index 7b2a20deb0..a1482f9534 100644
--- a/airflow/providers/pinecone/CHANGELOG.rst
+++ b/airflow/providers/pinecone/CHANGELOG.rst
@@ -20,6 +20,36 @@
 Changelog
 -
 
+2.0.0
+.
+
+Breaking changes
+
+
+.. warning::
+   This release of provider has breaking changes from previous versions. 
Changes are based on
+   the migration guide from pinecone - 
<https://canyon-quilt-082.notion.site/Pinecone-Python-SDK-v3-0-0-Migration-Guide-056d3897d7634bf7be399676a4757c7b>
+
+* ``log_level`` field is removed from the Connections as it is not used by the 
provider anymore.
+* ``PineconeHook.get_conn`` is removed in favor of ``conn`` property which 
returns the Connection object. Use ``pinecone_client`` property to access the 
Pinecone client.
+*  Following ``PineconeHook`` methods are converted from static methods to 
instance methods. Hence, Initialization is required to use these now:
+
+   + ``PineconeHook.list_indexes``
+   + ``PineconeHook.upsert``
+   + ``PineconeHook.create_index``
+   + ``PineconeHook.describe_index``
+   + ``PineconeHook.delete_index``
+   + ``PineconeHook.configure_index``
+   + ``PineconeHook.create_collection``
+   + ``PineconeHook.delete_collection``
+   + ``PineconeHook.describe_collection``
+   + ``PineconeHook.list_collections``
+   + ``PineconeHook.query_vector``
+   + ``PineconeHook.describe_index_stats``
+
+* ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or 
``PodSpec`` instead of directly accepting index related configurations
+* To initialize ``PineconeHook`` object, API key needs to be passed via 
argument or the connection.
+
 1.1.2
 .
 
diff --git a/airflow/providers/pinecone/hooks/pinecone.py 
b/airflow/providers/pinecone/hooks/pinecone.py
index 3d11c74b64..a04ae60ce8 100644
--- a/airflow/providers/pinecone/hooks/pinecone.py
+++ b/airflow/providers/pinecone/hooks/pinecone.py
@@ -20,9 +20,11 @@
 from __future__ import annotations
 
 import itertools
+import os
+from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
-import pinecone
+from pinecone import Pinecone, PodSpec, ServerlessSpec
 
 from airflow.hooks.base import BaseHook
 
@@ -30,6 +32,8 @@ if TYPE_CHECKING:
 from pinecone.core.client.model.sparse_values import SparseValues
 from pinecone.core.client.models import DescribeIndexStatsResponse, 
QueryResponse, UpsertResponse
 
+from airflow.models.connection import Connection
+
 
 class PineconeHook(BaseHook):
 """
@@ -49,10 +53,11 @@ class PineconeHook(BaseHook):
 """Return connection widgets to add to connection form."""
 from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
 from flask_babel import lazy_gettext
-from wtforms import StringField
+from wtforms import BooleanField, StringField
 
 return {
-"log_level": StringField(lazy_gettext("Log Level"), 
widget=BS3TextFieldWidget(), default=None),
+"region": StringField(lazy_gettext("Pinecone Region"), 
widget=BS3TextFieldWidget(), default=None),
+"debug_curl": BooleanField(lazy_gettext("PINECONE_DEBUG_CURL"), 
default=False),
 "project_id": StringField(
 lazy_gettext("Project ID"),
 widget=BS3TextFieldWidget(),
@@ -64,43 +69,73 @@ class PineconeHook(BaseHook):
 """Return custom field behaviour."""
 return {
 "hidden_fiel

(airflow) branch main updated: fix(io.path): add missing conn_id to string representation of ObjectStoragePath (#39313)

2024-04-29 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new ddd3b5b738 fix(io.path): add missing conn_id to string representation 
of ObjectStoragePath (#39313)
ddd3b5b738 is described below

commit ddd3b5b738fbfc13f6deeb7f0bc1a1e99c3013ba
Author: Wei Lee 
AuthorDate: Tue Apr 30 08:41:10 2024 +0800

fix(io.path): add missing conn_id to string representation of 
ObjectStoragePath (#39313)
---
 airflow/io/path.py| 6 ++
 tests/io/test_path.py | 5 +
 2 files changed, 11 insertions(+)

diff --git a/airflow/io/path.py b/airflow/io/path.py
index 709f50a316..87ab420b9c 100644
--- a/airflow/io/path.py
+++ b/airflow/io/path.py
@@ -361,3 +361,9 @@ class ObjectStoragePath(CloudPath):
 conn_id = data.pop("conn_id", None)
 
 return ObjectStoragePath(path, conn_id=conn_id, **_kwargs)
+
+def __str__(self):
+conn_id = self.storage_options.get("conn_id")
+if self._protocol and conn_id:
+return f"{self._protocol}://{conn_id}@{self.path}"
+return super().__str__()
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index c08284163b..1ccdfbfb79 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -403,3 +403,8 @@ class TestFs:
 
 assert o.fs is not None
 assert o._fs_cached
+
+@pytest.mark.parametrize("input_str", ("file:///tmp/foo", 
"s3://conn_id@bucket/test.txt"))
+def test_str(self, input_str):
+o = ObjectStoragePath(input_str)
+assert str(o) == input_str



(airflow) branch main updated: Allowing tasks to start execution directly from triggerer without going to worker (#38674)

2024-04-29 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 6112745b8f Allowing tasks to start execution directly from triggerer 
without going to worker (#38674)
6112745b8f is described below

commit 6112745b8f2ac7b0eeaf909a9f3209ff2819cc8b
Author: Wei Lee 
AuthorDate: Mon Apr 29 19:19:37 2024 +0800

Allowing tasks to start execution directly from triggerer without going to 
worker (#38674)
---
 airflow/decorators/base.py |  2 +
 airflow/models/abstractoperator.py |  2 +
 airflow/models/baseoperator.py |  5 ++
 airflow/models/dagrun.py   | 22 --
 airflow/models/mappedoperator.py   |  7 ++
 airflow/serialization/serialized_objects.py| 30 +++-
 .../authoring-and-scheduling/deferring.rst | 54 +++
 tests/models/test_dagrun.py| 28 
 tests/serialization/test_dag_serialization.py  | 79 ++
 9 files changed, 222 insertions(+), 7 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 7adaf8a447..2ae85a9c43 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -509,6 +509,8 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, 
FReturn, OperatorSubcla
 # task's expand() contribute to the op_kwargs operator argument, 
not
 # the operator arguments themselves, and should expand against it.
 expand_input_attr="op_kwargs_expand_input",
+start_trigger=self.operator_class.start_trigger,
+next_method=self.operator_class.next_method,
 )
 return XComArg(operator=operator)
 
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index fa59d6cfc9..3320b10109 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -122,6 +122,8 @@ class AbstractOperator(Templater, DAGNode):
 "node_id",  # Duplicates task_id
 "task_group",  # Doesn't have a useful repr, no point showing in UI
 "inherits_from_empty_operator",  # impl detail
+"start_trigger",
+"next_method",
 # For compatibility with TG, for operators these are just the 
current task, no point showing
 "roots",
 "leaves",
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 0fe9bdf380..ded7d2861e 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -818,6 +818,9 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 # Set to True for an operator instantiated by a mapped operator.
 __from_mapped = False
 
+start_trigger: BaseTrigger | None = None
+next_method: str | None = None
+
 def __init__(
 self,
 task_id: str,
@@ -1675,6 +1678,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 "is_teardown",
 "on_failure_fail_dagrun",
 "map_index_template",
+"start_trigger",
+"next_method",
 }
 )
 DagContext.pop_context_managed_dag()
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 777280e9aa..774f8d0983 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -51,7 +51,7 @@ from airflow import settings
 from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.configuration import conf as airflow_conf
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, 
TaskNotFound
+from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, 
TaskDeferred, TaskNotFound
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import Log
 from airflow.models.abstractoperator import NotMapped
@@ -905,9 +905,11 @@ class DagRun(Base, LoggingMixin):
 self.run_id,
 self.start_date,
 self.end_date,
-(self.end_date - self.start_date).total_seconds()
-if self.start_date and self.end_date
-else None,
+(
+(self.end_date - self.start_date).total_seconds()
+if self.start_date and self.end_date
+else None
+),
 self._state,
 self.external_trigger,
 self.run_type,
@@ -1537,6 +1539,18 @@ class DagRun(Base, LoggingMixin):

(airflow) branch main updated: Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator (#39018)

2024-04-28 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 28a240a18f Fix deferrable mode for DataflowTemplatedJobStartOperator 
and DataflowStartFlexTemplateOperator (#39018)
28a240a18f is described below

commit 28a240a18f7e5958e69732f61d639e1d8f39152f
Author: Eugene <53026723+e-ga...@users.noreply.github.com>
AuthorDate: Mon Apr 29 02:40:20 2024 +

Fix deferrable mode for DataflowTemplatedJobStartOperator and 
DataflowStartFlexTemplateOperator (#39018)
---
 airflow/providers/google/cloud/hooks/dataflow.py   | 177 +
 .../providers/google/cloud/operators/dataflow.py   |  86 ++
 .../providers/google/cloud/triggers/dataflow.py|   2 +-
 .../operators/cloud/dataflow.rst   |  20 ++-
 .../providers/google/cloud/hooks/test_dataflow.py  |  52 ++
 .../google/cloud/operators/test_dataflow.py|  50 --
 .../cloud/dataflow/example_dataflow_template.py|  42 -
 7 files changed, 336 insertions(+), 93 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataflow.py 
b/airflow/providers/google/cloud/hooks/dataflow.py
index a9bf802b14..59eee63501 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -41,9 +41,13 @@ from google.cloud.dataflow_v1beta3 import (
 MessagesV1Beta3AsyncClient,
 MetricsV1Beta3AsyncClient,
 )
-from google.cloud.dataflow_v1beta3.types import GetJobMetricsRequest, 
JobMessageImportance, JobMetrics
+from google.cloud.dataflow_v1beta3.types import (
+GetJobMetricsRequest,
+JobMessageImportance,
+JobMetrics,
+)
 from google.cloud.dataflow_v1beta3.types.jobs import ListJobsRequest
-from googleapiclient.discovery import build
+from googleapiclient.discovery import Resource, build
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType, 
beam_options_to_args
@@ -573,7 +577,7 @@ class DataflowHook(GoogleBaseHook):
 impersonation_chain=impersonation_chain,
 )
 
-def get_conn(self) -> build:
+def get_conn(self) -> Resource:
 """Return a Google Cloud Dataflow service object."""
 http_authorized = self._authorize()
 return build("dataflow", "v1b3", http=http_authorized, 
cache_discovery=False)
@@ -653,9 +657,9 @@ class DataflowHook(GoogleBaseHook):
 on_new_job_callback: Callable[[dict], None] | None = None,
 location: str = DEFAULT_DATAFLOW_LOCATION,
 environment: dict | None = None,
-) -> dict:
+) -> dict[str, str]:
 """
-Start Dataflow template job.
+Launch a Dataflow job with a Classic Template and wait for its 
completion.
 
 :param job_name: The name of the job.
 :param variables: Map of job runtime environment options.
@@ -688,26 +692,14 @@ class DataflowHook(GoogleBaseHook):
 environment=environment,
 )
 
-service = self.get_conn()
-
-request = (
-service.projects()
-.locations()
-.templates()
-.launch(
-projectId=project_id,
-location=location,
-gcsPath=dataflow_template,
-body={
-"jobName": name,
-"parameters": parameters,
-"environment": environment,
-},
-)
+job: dict[str, str] = self.send_launch_template_request(
+project_id=project_id,
+location=location,
+gcs_path=dataflow_template,
+job_name=name,
+parameters=parameters,
+environment=environment,
 )
-response = request.execute(num_retries=self.num_retries)
-
-job = response["job"]
 
 if on_new_job_id_callback:
 warnings.warn(
@@ -715,7 +707,7 @@ class DataflowHook(GoogleBaseHook):
 AirflowProviderDeprecationWarning,
 stacklevel=3,
 )
-on_new_job_id_callback(job.get("id"))
+on_new_job_id_callback(job["id"])
 
 if on_new_job_callback:
 on_new_job_callback(job)
@@ -734,7 +726,62 @@ class DataflowHook(GoogleBaseHook):
 expected_terminal_state=self.expected_terminal_state,
 )
 jobs_controller.wait_for_done()
-return response["job"]
+return job
+
+@_fallback_to_location_from_variables
+@_fallback_to_project_id_from_variables
+@GoogleBaseHook.fallback_to_default_project_id
+def launch_job_with_template(
+   

(airflow) branch main updated (bea1b7f70c -> 7683344c9c)

2024-04-26 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from bea1b7f70c Improve `DataprocCreateClusterOperator` Triggers for Better 
Error Handling and Resource Cleanup (#39130)
 add 7683344c9c Add `DatabricksNotebookOperator` (#39178)

No new revisions were added by this update.

Summary of changes:
 .../providers/databricks/operators/databricks.py   | 174 +
 airflow/providers/databricks/provider.yaml |   1 +
 .../operators/notebook.rst |  44 ++
 .../databricks/operators/test_databricks.py| 144 -
 .../providers/databricks/example_databricks.py |  54 +++
 5 files changed, 416 insertions(+), 1 deletion(-)
 create mode 100644 
docs/apache-airflow-providers-databricks/operators/notebook.rst



(airflow) branch main updated (79a26aa3b9 -> 04ac0c1b32)

2024-04-22 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 79a26aa3b9 Allow sort by on all fields in MappedInstances.tsx (#38090)
 add 04ac0c1b32 Add notification settings paramaters (#39175)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/databricks/operators/databricks.py   |  8 
 .../operators/jobs_create.rst  |  2 ++
 tests/providers/databricks/operators/test_databricks.py| 10 ++
 3 files changed, 20 insertions(+)



(airflow) branch main updated (a6f612d899 -> 2674a69780)

2024-04-19 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from a6f612d899 Rename model `ImportError` to `ParseImportError` for avoid 
shadowing with builtin exception (#39116)
 add 2674a69780 OpenAI Chat & Assistant hook functions (#38736)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/openai/hooks/openai.py| 176 -
 airflow/providers/openai/provider.yaml  |   2 +-
 generated/provider_dependencies.json|   2 +-
 tests/providers/openai/hooks/test_openai.py | 233 
 4 files changed, 410 insertions(+), 3 deletions(-)



(airflow) branch main updated: Deferrable mode for Dataflow sensors (#37693)

2024-04-17 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new b41cf629c8 Deferrable mode for Dataflow sensors (#37693)
b41cf629c8 is described below

commit b41cf629c8624b906ed29760e14037e8d2f9a370
Author: Eugene <53026723+e-ga...@users.noreply.github.com>
AuthorDate: Thu Apr 18 06:38:11 2024 +

Deferrable mode for Dataflow sensors (#37693)
---
 airflow/providers/google/cloud/hooks/dataflow.py   | 103 +++-
 airflow/providers/google/cloud/sensors/dataflow.py | 282 --
 .../providers/google/cloud/triggers/dataflow.py| 506 -
 .../operators/cloud/dataflow.rst   |  32 ++
 .../providers/google/cloud/hooks/test_dataflow.py  |  55 +-
 .../google/cloud/sensors/test_dataflow.py  | 462 ++-
 .../google/cloud/triggers/test_dataflow.py | 619 -
 .../example_dataflow_sensors_deferrable.py | 190 +++
 8 files changed, 2176 insertions(+), 73 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataflow.py 
b/airflow/providers/google/cloud/hooks/dataflow.py
index 64f309709c..a9bf802b14 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -31,7 +31,17 @@ from copy import deepcopy
 from typing import TYPE_CHECKING, Any, Callable, Generator, Sequence, TypeVar, 
cast
 
 from deprecated import deprecated
-from google.cloud.dataflow_v1beta3 import GetJobRequest, Job, JobState, 
JobsV1Beta3AsyncClient, JobView
+from google.cloud.dataflow_v1beta3 import (
+GetJobRequest,
+Job,
+JobState,
+JobsV1Beta3AsyncClient,
+JobView,
+ListJobMessagesRequest,
+MessagesV1Beta3AsyncClient,
+MetricsV1Beta3AsyncClient,
+)
+from google.cloud.dataflow_v1beta3.types import GetJobMetricsRequest, 
JobMessageImportance, JobMetrics
 from google.cloud.dataflow_v1beta3.types.jobs import ListJobsRequest
 from googleapiclient.discovery import build
 
@@ -47,6 +57,8 @@ from airflow.utils.timeout import timeout
 
 if TYPE_CHECKING:
 from google.cloud.dataflow_v1beta3.services.jobs_v1_beta3.pagers import 
ListJobsAsyncPager
+from google.cloud.dataflow_v1beta3.services.messages_v1_beta3.pagers 
import ListJobMessagesAsyncPager
+from google.protobuf.timestamp_pb2 import Timestamp
 
 
 # This is the default location
@@ -1353,3 +1365,92 @@ class AsyncDataflowHook(GoogleBaseAsyncHook):
 )
 page_result: ListJobsAsyncPager = await 
client.list_jobs(request=request)
 return page_result
+
+async def list_job_messages(
+self,
+job_id: str,
+project_id: str | None = PROVIDE_PROJECT_ID,
+minimum_importance: int = JobMessageImportance.JOB_MESSAGE_BASIC,
+page_size: int | None = None,
+page_token: str | None = None,
+start_time: Timestamp | None = None,
+end_time: Timestamp | None = None,
+location: str | None = DEFAULT_DATAFLOW_LOCATION,
+) -> ListJobMessagesAsyncPager:
+"""
+Return ListJobMessagesAsyncPager object from 
MessagesV1Beta3AsyncClient.
+
+This method wraps around a similar method of 
MessagesV1Beta3AsyncClient. ListJobMessagesAsyncPager can be iterated
+over to extract messages associated with a specific Job ID.
+
+For more details see the MessagesV1Beta3AsyncClient method description 
at:
+
https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient
+
+:param job_id: ID of the Dataflow job to get messages about.
+:param project_id: Optional. The Google Cloud project ID in which to 
start a job.
+If set to None or missing, the default project_id from the Google 
Cloud connection is used.
+:param minimum_importance: Optional. Filter to only get messages with 
importance >= level.
+For more details see the description at:
+
https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance
+:param page_size: Optional. If specified, determines the maximum 
number of messages to return.
+If unspecified, the service may choose an appropriate default, or 
may return an arbitrarily large number of results.
+:param page_token: Optional. If supplied, this should be the value of 
next_page_token returned by an earlier call.
+This will cause the next page of results to be returned.
+:param start_time: Optional. If specified, return only messages with 
timestamps >= start_time.
+The default is the job creation time (i.e. beginning of messages).
+:param end_time: Optional. If specified, return only messages with 
timesta

(airflow) branch main updated (40bddb3686 -> 66df296a6e)

2024-04-16 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 40bddb3686 Fix doc redirects (#39039)
 add 66df296a6e [FIX] typo in parameter (#39050)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/databricks/operators/databricks.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated: Add logic to handle on_kill for BigQueryInsertJobOperator when deferrable=True (#38912)

2024-04-15 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new e237041142 Add logic to handle on_kill for BigQueryInsertJobOperator 
when deferrable=True (#38912)
e237041142 is described below

commit e237041142e36349cc62e743105c91b04ddf4253
Author: Ankit Chaurasia <8670962+sunank...@users.noreply.github.com>
AuthorDate: Mon Apr 15 16:51:04 2024 +0545

Add logic to handle on_kill for BigQueryInsertJobOperator when 
deferrable=True (#38912)
---
 airflow/providers/google/cloud/hooks/bigquery.py   | 25 +++
 .../providers/google/cloud/operators/bigquery.py   |  1 +
 .../providers/google/cloud/triggers/bigquery.py| 11 +
 .../providers/google/cloud/hooks/test_bigquery.py  | 49 ++
 .../google/cloud/triggers/test_bigquery.py | 36 
 5 files changed, 122 insertions(+)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index 0594ce4351..f270e256fb 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3388,6 +3388,31 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
 job_query_resp = await job_client.query(query_request, 
cast(Session, session))
 return job_query_resp["jobReference"]["jobId"]
 
+async def cancel_job(self, job_id: str, project_id: str | None, location: 
str | None) -> None:
+"""
+Cancel a BigQuery job.
+
+:param job_id: ID of the job to cancel.
+:param project_id: Google Cloud Project where the job was running.
+:param location: Location where the job was running.
+"""
+async with ClientSession() as session:
+token = await self.get_token(session=session)
+job = Job(job_id=job_id, project=project_id, location=location, 
token=token, session=session)  # type: ignore[arg-type]
+
+self.log.info(
+"Attempting to cancel BigQuery job: %s in project: %s, 
location: %s",
+job_id,
+project_id,
+location,
+)
+try:
+await job.cancel()
+self.log.info("Job %s cancellation requested.", job_id)
+except Exception as e:
+self.log.error("Failed to cancel BigQuery job %s: %s", job_id, 
str(e))
+raise
+
 def get_records(self, query_results: dict[str, Any], as_dict: bool = 
False) -> list[Any]:
 """Convert a response from BigQuery to records.
 
diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 68b423fb46..9da97afc2a 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2903,6 +2903,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryOpenLineageMix
 location=self.location or hook.location,
 poll_interval=self.poll_interval,
 impersonation_chain=self.impersonation_chain,
+cancel_on_kill=self.cancel_on_kill,
 ),
 method_name="execute_complete",
 )
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py 
b/airflow/providers/google/cloud/triggers/bigquery.py
index eafa4825be..fd01705261 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -57,6 +57,7 @@ class BigQueryInsertJobTrigger(BaseTrigger):
 table_id: str | None = None,
 poll_interval: float = 4.0,
 impersonation_chain: str | Sequence[str] | None = None,
+cancel_on_kill: bool = True,
 ):
 super().__init__()
 self.log.info("Using the connection  %s .", conn_id)
@@ -69,6 +70,7 @@ class BigQueryInsertJobTrigger(BaseTrigger):
 self.table_id = table_id
 self.poll_interval = poll_interval
 self.impersonation_chain = impersonation_chain
+self.cancel_on_kill = cancel_on_kill
 
 def serialize(self) -> tuple[str, dict[str, Any]]:
 """Serialize BigQueryInsertJobTrigger arguments and classpath."""
@@ -83,6 +85,7 @@ class BigQueryInsertJobTrigger(BaseTrigger):
 "table_id": self.table_id,
 "poll_interval": self.poll_interval,
 "impersonation_chain": self.impersonation_chain,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
@@ -113,6 

(airflow) branch main updated: Bugfix: Move rendering of `map_index_template` so it renders for failed tasks as long as it was defined before the point of failure (#38902)

2024-04-15 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 456ec48d12 Bugfix: Move rendering of `map_index_template` so it 
renders for failed tasks as long as it was defined before the point of failure 
(#38902)
456ec48d12 is described below

commit 456ec48d12be02ca7266f021a16e01abb5d4c5a3
Author: Tamara Janina Fingerlin <90063506+tja...@users.noreply.github.com>
AuthorDate: Mon Apr 15 13:05:31 2024 +0200

Bugfix: Move rendering of `map_index_template` so it renders for failed 
tasks as long as it was defined before the point of failure (#38902)

Co-authored-by: Tzu-ping Chung 
---
 airflow/models/taskinstance.py  | 25 ++---
 tests/models/test_mappedoperator.py | 29 +
 2 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2b8b935d78..8f9d71cfe7 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2731,18 +2731,29 @@ class TaskInstance(Base, LoggingMixin):
 previous_state=TaskInstanceState.QUEUED, task_instance=self, 
session=session
 )
 
-# Execute the task
+def _render_map_index(context: Context, *, jinja_env: 
jinja2.Environment | None) -> str | None:
+"""Render named map index if the DAG author defined 
map_index_template at the task level."""
+if jinja_env is None or (template := 
context.get("map_index_template")) is None:
+return None
+rendered_map_index = 
jinja_env.from_string(template).render(context)
+log.debug("Map index rendered as %s", rendered_map_index)
+return rendered_map_index
+
+# Execute the task.
 with set_current_context(context):
-result = self._execute_task(context, task_orig)
+try:
+result = self._execute_task(context, task_orig)
+except Exception:
+# If the task failed, swallow rendering error so it 
doesn't mask the main error.
+with contextlib.suppress(jinja2.TemplateSyntaxError, 
jinja2.UndefinedError):
+self.rendered_map_index = _render_map_index(context, 
jinja_env=jinja_env)
+raise
+else:  # If the task succeeded, render normally to let 
rendering error bubble up.
+self.rendered_map_index = _render_map_index(context, 
jinja_env=jinja_env)
 
 # Run post_execute callback
 self.task.post_execute(context=context, result=result)
 
-# DAG authors define map_index_template at the task level
-if jinja_env is not None and (template := 
context.get("map_index_template")) is not None:
-rendered_map_index = self.rendered_map_index = 
jinja_env.from_string(template).render(context)
-self.log.info("Map index rendered as %s", rendered_map_index)
-
 Stats.incr(f"operator_successes_{self.task.task_type}", 
tags=self.stats_tags)
 # Same metric with tagging
 Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": 
self.task.task_type})
diff --git a/tests/models/test_mappedoperator.py 
b/tests/models/test_mappedoperator.py
index e80a629794..9f31652424 100644
--- a/tests/models/test_mappedoperator.py
+++ b/tests/models/test_mappedoperator.py
@@ -631,6 +631,33 @@ def _create_mapped_with_name_template_taskflow(*, task_id, 
map_names, template):
 return task1.expand(map_name=map_names)
 
 
+def _create_named_map_index_renders_on_failure_classic(*, task_id, map_names, 
template):
+class HasMapName(BaseOperator):
+def __init__(self, *, map_name: str, **kwargs):
+super().__init__(**kwargs)
+self.map_name = map_name
+
+def execute(self, context):
+context["map_name"] = self.map_name
+raise AirflowSkipException("Imagine this task failed!")
+
+return HasMapName.partial(task_id=task_id, 
map_index_template=template).expand(
+map_name=map_names,
+)
+
+
+def _create_named_map_index_renders_on_failure_taskflow(*, task_id, map_names, 
template):
+from airflow.operators.python import get_current_context
+
+@task(task_id=task_id, map_index_template=template)
+def task1(map_name):
+context = get_current_context()
+context["map_name"] = map_name
+raise AirflowSkipException("Imagine this task failed!")
+
+return task1.expand(map_name=map_names)
+
+
 @pytest.mark.parametrize(
 "template, expe

(airflow) branch main updated: Adds job_id as path param in update permission (#38962)

2024-04-14 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 629545bea2 Adds job_id as path param in update permission (#38962)
629545bea2 is described below

commit 629545bea2afa55dbda9b839734b4851d9da566e
Author: SubhamSinghal 
AuthorDate: Sun Apr 14 14:11:48 2024 +0530

Adds job_id as path param in update permission (#38962)
---
 airflow/providers/databricks/hooks/databricks.py| 6 +++---
 airflow/providers/databricks/operators/databricks.py| 2 +-
 tests/providers/databricks/operators/test_databricks.py | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/databricks/hooks/databricks.py 
b/airflow/providers/databricks/hooks/databricks.py
index c86391f4cd..1a0ab8e8c6 100644
--- a/airflow/providers/databricks/hooks/databricks.py
+++ b/airflow/providers/databricks/hooks/databricks.py
@@ -51,7 +51,6 @@ DELETE_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/delete")
 REPAIR_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/repair")
 OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "api/2.1/jobs/runs/get-output")
 CANCEL_ALL_RUNS_ENDPOINT = ("POST", "api/2.1/jobs/runs/cancel-all")
-UPDATE_PERMISSION_ENDPOINT = ("PATCH", "api/2.0/permissions/jobs")
 
 INSTALL_LIBS_ENDPOINT = ("POST", "api/2.0/libraries/install")
 UNINSTALL_LIBS_ENDPOINT = ("POST", "api/2.0/libraries/uninstall")
@@ -656,14 +655,15 @@ class DatabricksHook(BaseDatabricksHook):
 
 return None
 
-def update_job_permission(self, json: dict[str, Any]) -> dict:
+def update_job_permission(self, job_id: int, json: dict[str, Any]) -> dict:
 """
 Update databricks job permission.
 
+:param job_id: job id
 :param json: payload
 :return: json containing permission specification
 """
-return self._do_api_call(UPDATE_PERMISSION_ENDPOINT, json)
+return self._do_api_call(("PATCH", 
f"api/2.0/permissions/jobs/{job_id}"), json)
 
 def test_connection(self) -> tuple[bool, str]:
 """Test the Databricks connectivity from UI."""
diff --git a/airflow/providers/databricks/operators/databricks.py 
b/airflow/providers/databricks/operators/databricks.py
index 3d95c61f6a..1d0d920ecc 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -318,7 +318,7 @@ class DatabricksCreateJobsOperator(BaseOperator):
 self._hook.reset_job(str(job_id), self.json)
 if (access_control_list := self.json.get("access_control_list")) is 
not None:
 acl_json = {"access_control_list": access_control_list}
-self._hook.update_job_permission(normalise_json_content(acl_json))
+self._hook.update_job_permission(job_id, 
normalise_json_content(acl_json))
 
 return job_id
 
diff --git a/tests/providers/databricks/operators/test_databricks.py 
b/tests/providers/databricks/operators/test_databricks.py
index 278f95bf01..46e14a917a 100644
--- a/tests/providers/databricks/operators/test_databricks.py
+++ b/tests/providers/databricks/operators/test_databricks.py
@@ -538,7 +538,7 @@ class TestDatabricksCreateJobsOperator:
 caller="DatabricksCreateJobsOperator",
 )
 
-db_mock.update_job_permission.assert_called_once_with(expected)
+db_mock.update_job_permission.assert_called_once_with(JOB_ID, expected)
 
 
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
 def test_exec_update_job_permission_with_empty_acl(self, db_mock_class):



(airflow) branch main updated: Rename SparkSubmitOperator argument queue as yarn_queue (#38852)

2024-04-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 9d46507f59 Rename SparkSubmitOperator argument queue as yarn_queue 
(#38852)
9d46507f59 is described below

commit 9d46507f59d0ec976656f8e9f24046f6db51be4f
Author: Ashish Patel 
AuthorDate: Thu Apr 11 17:39:06 2024 +0530

Rename SparkSubmitOperator argument queue as yarn_queue (#38852)
---
 airflow/providers/apache/spark/hooks/spark_submit.py | 12 ++--
 .../providers/apache/spark/operators/spark_submit.py |  8 
 .../apache/spark/operators/test_spark_submit.py  | 20 +---
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py 
b/airflow/providers/apache/spark/hooks/spark_submit.py
index b06e001a07..8213b9af14 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -82,9 +82,9 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
  (will overwrite any spark_binary defined in the 
connection's extra JSON)
 :param properties_file: Path to a file from which to load extra 
properties. If not
   specified, this will look for 
conf/spark-defaults.conf.
-:param queue: The name of the YARN queue to which the application is 
submitted.
+:param yarn_queue: The name of the YARN queue to which the application is 
submitted.
 (will overwrite any yarn queue defined in the 
connection's extra JSON)
-:param deploy_mode: Whether to deploy your driver on the worker nodes 
(cluster) or locally as anclient.
+:param deploy_mode: Whether to deploy your driver on the worker nodes 
(cluster) or locally as an client.
 (will overwrite any deployment mode defined in the 
connection's extra JSON)
 :param use_krb5ccache: if True, configure spark to use ticket cache 
instead of relying
 on keytab for Kerberos login
@@ -165,7 +165,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 verbose: bool = False,
 spark_binary: str | None = None,
 properties_file: str | None = None,
-queue: str | None = None,
+yarn_queue: str | None = None,
 deploy_mode: str | None = None,
 *,
 use_krb5ccache: bool = False,
@@ -201,7 +201,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 self._kubernetes_driver_pod: str | None = None
 self.spark_binary = spark_binary
 self._properties_file = properties_file
-self._queue = queue
+self._yarn_queue = yarn_queue
 self._deploy_mode = deploy_mode
 self._connection = self._resolve_connection()
 self._is_yarn = "yarn" in self._connection["master"]
@@ -231,7 +231,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 # Build from connection master or default to yarn if not available
 conn_data = {
 "master": "yarn",
-"queue": None,
+"queue": None,  # yarn queue
 "deploy_mode": None,
 "spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY,
 "namespace": None,
@@ -248,7 +248,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 
 # Determine optional yarn queue from the extra field
 extra = conn.extra_dejson
-conn_data["queue"] = self._queue if self._queue else 
extra.get("queue")
+conn_data["queue"] = self._yarn_queue if self._yarn_queue else 
extra.get("queue")
 conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode 
else extra.get("deploy-mode")
 if not self.spark_binary:
 self.spark_binary = extra.get("spark-binary", 
DEFAULT_SPARK_BINARY)
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py 
b/airflow/providers/apache/spark/operators/spark_submit.py
index 62f7918fcf..281919b2b1 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -72,7 +72,7 @@ class SparkSubmitOperator(BaseOperator):
  (will overwrite any spark_binary defined in the 
connection's extra JSON)
 :param properties_file: Path to a file from which to load extra 
properties. If not
   specified, this will look for 
conf/spark-defaults.conf.
-:param queue: The name of the YARN queue to which the application is 
submitted.
+:param yarn_queue: The name of the YARN queue to which the application is 
submitted.
 (will overwrite any yarn queue d

(airflow) branch main updated (c1da5bb6e4 -> 4a669fb1a9)

2024-04-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from c1da5bb6e4 Build helm values schemas with kubernetes 1.29 resources 
(#38460)
 add 4a669fb1a9 Remove extra slash from update permission endpoint (#38918)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/databricks/hooks/databricks.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated (1757704d8f -> 5ff26586cd)

2024-04-11 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 1757704d8f Enhancement for SSL-support in CloudSQLExecuteQueryOperator 
(#38894)
 add 5ff26586cd Deferrable mode for Custom Training Job operators (#38584)

No new revisions were added by this update.

Summary of changes:
 .../google/cloud/hooks/vertex_ai/custom_job.py | 1505 +++-
 airflow/providers/google/cloud/links/vertex_ai.py  |3 +-
 .../google/cloud/operators/vertex_ai/custom_job.py |  359 -
 .../cloud/operators/vertex_ai/pipeline_job.py  |1 -
 .../providers/google/cloud/triggers/vertex_ai.py   |   94 ++
 .../operators/cloud/vertex_ai.rst  |   48 +-
 .../cloud/hooks/vertex_ai/test_custom_job.py   |  266 +++-
 .../google/cloud/operators/test_vertex_ai.py   |  276 +++-
 .../google/cloud/triggers/test_vertex_ai.py|  520 ++-
 .../example_vertex_ai_custom_container.py  |   38 +-
 .../vertex_ai/example_vertex_ai_custom_job.py  |   69 +-
 .../example_vertex_ai_custom_job_python_package.py |   38 +-
 12 files changed, 3084 insertions(+), 133 deletions(-)



(airflow) branch main updated: Update COMMITTERS.rst (#38855)

2024-04-09 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 8822692e60 Update COMMITTERS.rst (#38855)
8822692e60 is described below

commit 8822692e60fafa4e54e1a8cf819f8ccd8511be21
Author: Elad Kalif <45845474+elad...@users.noreply.github.com>
AuthorDate: Tue Apr 9 10:58:08 2024 +0300

Update COMMITTERS.rst (#38855)

Co-authored-by: Tzu-ping Chung 
---
 COMMITTERS.rst | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/COMMITTERS.rst b/COMMITTERS.rst
index da215d8baf..4c56e220ed 100644
--- a/COMMITTERS.rst
+++ b/COMMITTERS.rst
@@ -222,5 +222,7 @@ To be able to merge PRs, committers have to integrate their 
GitHub ID with Apach
 * ``dev/breeze/src/airflow_breeze/global_constants.py`` (COMMITTERS 
variable)
 * name and GitHub ID in `project.rst 
<https://github.com/apache/airflow/blob/main/docs/apache-airflow/project.rst>`__.
 * If you had been a collaborator role before getting committer, remove 
your Github ID from ``.asf.yaml``.
-7.  To be listed on airflow main entry web site, also raise a PR in
+7.  Raise PR to airflow site with the following:
+* List your name in the committers list
 `Airflow-Site committers.json 
<https://github.com/apache/airflow-site/blob/main/landing-pages/site/data/committers.json>`__.
+* Post entry in `Announcements 
<https://github.com/apache/airflow-site/blob/main/landing-pages/site/content/en/announcements/_index.md>`__.



(airflow-site) branch add-weilee-to-committer-list updated (587ae05178 -> 43cd4e89d8)

2024-04-08 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch add-weilee-to-committer-list
in repository https://gitbox.apache.org/repos/asf/airflow-site.git


 discard 587ae05178 add Wei Lee to committer list
 add 43cd4e89d8 add Wei Lee to committer list

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (587ae05178)
\
 N -- N -- N   refs/heads/add-weilee-to-committer-list (43cd4e89d8)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

No new revisions were added by this update.

Summary of changes:
 landing-pages/site/data/committers.json | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated: Add cancel_previous_run to DatabricksRunNowOperator (#38702)

2024-04-07 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 4e6d3fa4cf Add cancel_previous_run to DatabricksRunNowOperator (#38702)
4e6d3fa4cf is described below

commit 4e6d3fa4cf60f4b59325bbffa658ebb0b12aee87
Author: SubhamSinghal 
AuthorDate: Mon Apr 8 07:16:40 2024 +0530

Add cancel_previous_run to DatabricksRunNowOperator (#38702)

Co-authored-by: subham611 
---
 .../providers/databricks/operators/databricks.py   |   7 ++
 .../operators/run_now.rst  |   3 +
 .../databricks/operators/test_databricks.py| 110 +
 3 files changed, 99 insertions(+), 21 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py 
b/airflow/providers/databricks/operators/databricks.py
index 0eedc444fb..247d810a6b 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -651,6 +651,7 @@ class DatabricksRunNowOperator(BaseOperator):
 - ``spark_submit_params``
 - ``idempotency_token``
 - ``repair_run``
+- ``cancel_previous_runs``
 
 :param job_id: the job_id of the existing Databricks job.
 This field will be templated.
@@ -740,6 +741,7 @@ class DatabricksRunNowOperator(BaseOperator):
 :param wait_for_termination: if we should wait for termination of the job 
run. ``True`` by default.
 :param deferrable: Run operator in the deferrable mode.
 :param repair_run: Repair the databricks run in case of failure.
+:param cancel_previous_runs: Cancel all existing running jobs before 
submitting new one.
 """
 
 # Used in airflow.models.BaseOperator
@@ -771,6 +773,7 @@ class DatabricksRunNowOperator(BaseOperator):
 wait_for_termination: bool = True,
 deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
 repair_run: bool = False,
+cancel_previous_runs: bool = False,
 **kwargs,
 ) -> None:
 """Create a new ``DatabricksRunNowOperator``."""
@@ -784,6 +787,7 @@ class DatabricksRunNowOperator(BaseOperator):
 self.wait_for_termination = wait_for_termination
 self.deferrable = deferrable
 self.repair_run = repair_run
+self.cancel_previous_runs = cancel_previous_runs
 
 if job_id is not None:
 self.json["job_id"] = job_id
@@ -831,6 +835,9 @@ class DatabricksRunNowOperator(BaseOperator):
 self.json["job_id"] = job_id
 del self.json["job_name"]
 
+if self.cancel_previous_runs and self.json["job_id"] is not None:
+hook.cancel_all_runs(self.json["job_id"])
+
 self.run_id = hook.run_now(self.json)
 if self.deferrable:
 _handle_deferrable_databricks_operator_execution(self, hook, 
self.log, context)
diff --git a/docs/apache-airflow-providers-databricks/operators/run_now.rst 
b/docs/apache-airflow-providers-databricks/operators/run_now.rst
index a4b00d9005..facf47e7d6 100644
--- a/docs/apache-airflow-providers-databricks/operators/run_now.rst
+++ b/docs/apache-airflow-providers-databricks/operators/run_now.rst
@@ -45,6 +45,9 @@ All other parameters are optional and described in 
documentation for ``Databrick
 * ``python_named_parameters``
 * ``jar_params``
 * ``spark_submit_params``
+* ``idempotency_token``
+* ``repair_run``
+* ``cancel_previous_runs``
 
 DatabricksRunNowDeferrableOperator
 ==
diff --git a/tests/providers/databricks/operators/test_databricks.py 
b/tests/providers/databricks/operators/test_databricks.py
index 6797377161..f2a3441f43 100644
--- a/tests/providers/databricks/operators/test_databricks.py
+++ b/tests/providers/databricks/operators/test_databricks.py
@@ -737,7 +737,7 @@ class TestDatabricksSubmitRunOperator:
 }
 op = DatabricksSubmitRunOperator(task_id=TASK_ID, json=run)
 db_mock = db_mock_class.return_value
-db_mock.submit_run.return_value = 1
+db_mock.submit_run.return_value = RUN_ID
 db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
 
 op.execute(None)
@@ -767,7 +767,7 @@ class TestDatabricksSubmitRunOperator:
 op = DatabricksSubmitRunOperator(task_id=TASK_ID, json=run)
 db_mock = db_mock_class.return_value
 db_mock.find_pipeline_id_by_name.return_value = 
PIPELINE_ID_TASK["pipeline_id"]
-db_mock.submit_run.return_value = 1
+db_mock.submit_run.return_value = RUN_ID
 db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
 
 op.execute(None)
@@ -798,7 +798,7

(airflow-site) branch add-weilee-to-committer-list updated (2c9267e99d -> 587ae05178)

2024-04-04 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch add-weilee-to-committer-list
in repository https://gitbox.apache.org/repos/asf/airflow-site.git


omit 2c9267e99d add Wei Lee to committer list
 add 587ae05178 add Wei Lee to committer list

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2c9267e99d)
\
 N -- N -- N   refs/heads/add-weilee-to-committer-list (587ae05178)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

No new revisions were added by this update.

Summary of changes:
 landing-pages/site/data/committers.json | 348 
 1 file changed, 174 insertions(+), 174 deletions(-)



(airflow-site) branch add-weilee-to-committer-list created (now 2c9267e99d)

2024-04-04 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a change to branch add-weilee-to-committer-list
in repository https://gitbox.apache.org/repos/asf/airflow-site.git


  at 2c9267e99d add Wei Lee to committer list

This branch includes the following new commits:

 new 2c9267e99d add Wei Lee to committer list

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(airflow-site) 01/01: add Wei Lee to committer list

2024-04-04 Thread weilee
This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch add-weilee-to-committer-list
in repository https://gitbox.apache.org/repos/asf/airflow-site.git

commit 2c9267e99d13a0bbd343ce74f7b7f696272b2c51
Author: Wei Lee 
AuthorDate: Thu Apr 4 17:20:12 2024 +0800

add Wei Lee to committer list
---
 landing-pages/site/data/committers.json | 342 
 1 file changed, 174 insertions(+), 168 deletions(-)

diff --git a/landing-pages/site/data/committers.json 
b/landing-pages/site/data/committers.json
index 71e5b6a188..583e45a0d1 100644
--- a/landing-pages/site/data/committers.json
+++ b/landing-pages/site/data/committers.json
@@ -1,170 +1,176 @@
 [
-  {
-"name": "Amogh Desai",
-"github": "https://github.com/amoghrajesh";,
-"image": "https://github.com/amoghrajesh.png";,
-"nick": "amoghrajesh"
-  },
-  {
-"name": "Aneesh Joseph",
-"github": "https://github.com/aneesh-joseph";,
-"image": "https://github.com/aneesh-joseph.png";,
-"nick": "aneesh-joseph"
-  },
-  {
-"name": "Bas Harenslak",
-"github": "https://github.com/BasPH";,
-"image": "https://github.com/BasPH.png";,
-"nick": "BasPH"
-  },
-  {
-"name": "Chao-Han Tsai",
-"github": "https://github.com/milton0825";,
-"image": "https://github.com/milton0825.png";,
-"nick": "milton0825"
-  },
-  {
-"name": "Dennis Ferruzzi",
-"github": "https://github.com/ferruzzi";,
-"image": "https://github.com/ferruzzi.png";,
-"nick": "ferruzzi"
-  },
-  {
-"name": "Felix Uellendall",
-"github": "https://github.com/feluelle";,
-"image": "https://github.com/feluelle.png";,
-"nick": "feluelle"
-  },
-  {
-"name": "James Timmins",
-"github": "https://github.com/jhtimmins";,
-"image": "https://github.com/jhtimmins.png";,
-"nick": "jhtimmins"
-  },
-  {
-"name": "Jens Scheffler",
-"github": "https://github.com/jscheffl";,
-"image": "https://github.com/jscheffl.png";,
-"nick": "jscheffl"
-  },
-  {
-"name": "Jiajie Zhong",
-"github": "https://github.com/zhongjiajie";,
-"image": "https://github.com/zhongjiajie.png";,
-"nick": "zhongjiajie"
-  },
-  {
-"name": "Josh Fell",
-"github": "https://github.com/josh-fell";,
-"image": "https://github.com/josh-fell.png";,
-"nick": "josh-fell"
-  },
-  {
-"name": "Joshua Carp",
-"github": "https://github.com/jmcarp";,
-"image": "https://github.com/jmcarp.png";,
-"nick": "jmcarp"
-  },
-  {
-"name": "Leah E. Cole",
-"github": "https://github.com/leahecole";,
-"image": "https://github.com/leahecole.png";,
-"nick": "leahecole"
-  },
-  {
-"name": "Maciej Obuchowski",
-"github": "https://github.com/mobuchowski";,
-"image": "https://github.com/mobuchowski.png";,
-"nick": "mobuchowski"
-  },
-  {
-"name": "Malthe Borch",
-"github": "https://github.com/malthe";,
-"image": "https://github.com/malthe.png";,
-"nick": "malthe"
-  },
-  {
-"name": "Niko Oliveira",
-"github": "https://github.com/o-nikolas";,
-"image": "https://github.com/o-nikolas.png";,
-"nick": "o-nikolas"
-  },
-  {
-"name": "Pankaj Koti",
-"github": "https://github.com/pankajkoti";,
-"image": "https://github.com/pankajkoti.png";,
-"nick": "pankajkoti"
-  },
-  {
-"name": "Pankaj Singh",
-"github": "https://github.com/pankajastro";,
-"image": "https://github.com/pankajastro.png";,
-"nick": "pankajastro"
-  },
-  {
-"name": "Patrick Leo Tardif",
-"github": "https://github.co