(airflow) branch main updated (bca2930d0e -> c2f1739aa0)

2024-05-24 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from bca2930d0e Make _run_raw_task AIP-44 compatible (#38992)
 add c2f1739aa0 Fix TIPydantic serialization of MappedOperator (#39288)

No new revisions were added by this update.

Summary of changes:
 airflow/models/expandinput.py  |  7 ++-
 airflow/models/mappedoperator.py   |  7 ++-
 airflow/serialization/pydantic/taskinstance.py |  9 ++--
 airflow/serialization/serialized_objects.py|  4 +-
 tests/serialization/test_pydantic_models.py| 64 ++
 5 files changed, 84 insertions(+), 7 deletions(-)



(airflow) branch main updated (6fb9fb7bf8 -> bca2930d0e)

2024-05-24 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 6fb9fb7bf8 Fix MyPy checks in CI to cover also tests folders (#39815)
 add bca2930d0e Make _run_raw_task AIP-44 compatible (#38992)

No new revisions were added by this update.

Summary of changes:
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  21 +-
 airflow/cli/commands/task_command.py   |   2 +-
 airflow/datasets/manager.py|  36 +-
 airflow/models/dagrun.py   |   2 +-
 airflow/models/taskinstance.py | 765 +
 airflow/models/xcom.py |   1 +
 airflow/serialization/pydantic/taskinstance.py |  78 ++-
 airflow/utils/task_instance_session.py |   3 +
 8 files changed, 592 insertions(+), 316 deletions(-)



(airflow) branch main updated (4ee46b984d -> 4d525aa32d)

2024-05-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 4ee46b984d Remove `openlineage.common` dependencies in Google and 
Snowflake providers. (#39614)
 add 4d525aa32d Determine needs_expansion at time of serialization (#39604)

No new revisions were added by this update.

Summary of changes:
 .../endpoints/task_instance_endpoint.py|  3 +--
 airflow/cli/commands/task_command.py   | 11 +
 airflow/models/abstractoperator.py | 15 -
 airflow/models/baseoperator.py |  1 +
 airflow/models/mappedoperator.py   |  1 +
 airflow/models/operator.py | 26 ++
 airflow/serialization/serialized_objects.py|  1 +
 airflow/ti_deps/deps/trigger_rule_dep.py   |  5 ++---
 airflow/www/views.py   |  3 +--
 tests/serialization/test_dag_serialization.py  | 18 ---
 10 files changed, 43 insertions(+), 41 deletions(-)



(airflow) branch main updated: Print stderr from helm test failures (#39698)

2024-05-18 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 70f868e867 Print stderr from helm test failures (#39698)
70f868e867 is described below

commit 70f868e86704ac7810762df97190aa2575fea7d2
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Sat May 18 08:54:14 2024 -0700

Print stderr from helm test failures (#39698)

Previously you would not see the error message; you'd only see that the 
command failed.  Now you'll see it.
---
 tests/charts/helm_template_generator.py | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/tests/charts/helm_template_generator.py 
b/tests/charts/helm_template_generator.py
index 2cdcce72ce..d3a8ed954b 100644
--- a/tests/charts/helm_template_generator.py
+++ b/tests/charts/helm_template_generator.py
@@ -102,6 +102,11 @@ def validate_k8s_object(instance, kubernetes_version):
 validate.validate(instance)
 
 
+class HelmFailedError(subprocess.CalledProcessError):
+def __str__(self):
+return f"Helm command failed. Args: {self.args}\nStderr: 
\n{self.stderr.decode('utf-8')}"
+
+
 def render_chart(
 name="release-name",
 values=None,
@@ -135,7 +140,10 @@ def render_chart(
 if show_only:
 for i in show_only:
 command.extend(["--show-only", i])
-templates = subprocess.check_output(command, stderr=subprocess.PIPE, 
cwd=chart_dir)
+result = subprocess.run(command, capture_output=True, cwd=chart_dir)
+if result.returncode:
+raise HelmFailedError(result.returncode, result.args, 
result.stdout, result.stderr)
+templates = result.stdout
 k8s_objects = yaml.full_load_all(templates)
 k8s_objects = [k8s_object for k8s_object in k8s_objects if k8s_object] 
 # type: ignore
 for k8s_object in k8s_objects:



(airflow) branch main updated: Rename `telemetry-collection` to `usage-data-collection` (#39673)

2024-05-16 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d4a5f4e3a7 Rename `telemetry-collection` to `usage-data-collection` 
(#39673)
d4a5f4e3a7 is described below

commit d4a5f4e3a7eb7acc42ea383fda700c3c28d40bf5
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu May 16 15:07:01 2024 -0700

Rename `telemetry-collection` to `usage-data-collection` (#39673)

The point here is to avoid confusion with the _other_ (and arguably of 
greater importance to users) telemetry concept, namely OTEL / metrics / stats.

While at it, I made the code a little bit more provider-agnostic.
---
 airflow/cli/commands/scheduler_command.py   |  4 ++--
 airflow/config_templates/config.yml | 10 +-
 airflow/settings.py |  6 +++---
 .../utils/{scarf.py => usage_data_collection.py}| 12 ++--
 airflow/www/views.py| 21 -
 docs/apache-airflow/faq.rst |  8 
 .../installation/installing-from-pypi.rst   |  5 ++---
 tests/core/test_settings.py | 10 +-
 ...{test_scarf.py => test_usage_data_collection.py} | 20 ++--
 tests/www/views/test_views.py   | 12 ++--
 tests/www/views/test_views_home.py  |  2 +-
 11 files changed, 60 insertions(+), 50 deletions(-)

diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 4f943e961b..2b7c77fda9 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -33,8 +33,8 @@ from airflow.jobs.scheduler_job_runner import 
SchedulerJobRunner
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import process_subdir
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
-from airflow.utils.scarf import scarf_analytics
 from airflow.utils.scheduler_health import serve_health_check
+from airflow.utils.usage_data_collection import usage_data_collection
 
 log = logging.getLogger(__name__)
 
@@ -56,7 +56,7 @@ def scheduler(args: Namespace):
 """Start Airflow Scheduler."""
 print(settings.HEADER)
 
-scarf_analytics()
+usage_data_collection()
 
 run_command_with_daemon_option(
 args=args,
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index edfe56b45c..36fb176e95 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2591,10 +2591,10 @@ sensors:
   type: float
   example: ~
   default: "604800"
-telemetry_collection:
+usage_data_collection:
   description: |
-Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic 
telemetry data during operation.
-This data assists Airflow maintainers in better understanding how Airflow 
is used.
+Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic 
platform and usage data
+during operation. This data assists Airflow maintainers in better 
understanding how Airflow is used.
 Insights gained from this telemetry are critical for prioritizing patches, 
minor releases, and
 security fixes. Additionally, this information supports key decisions 
related to the development road map.
 Check the FAQ doc for more information on what data is collected.
@@ -2607,9 +2607,9 @@ telemetry_collection:
   options:
 enabled:
   description: |
-Enable or disable telemetry data collection and sending via Scarf.
+Enable or disable usage data collection and sending.
   version_added: 2.10.0
   type: boolean
   example: ~
   default: "True"
-  see_also: ":ref:`Airflow telemetry FAQ `"
+  see_also: ":ref:`Usage data collection FAQ `"
diff --git a/airflow/settings.py b/airflow/settings.py
index 176d06270e..50c195f7fd 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -576,9 +576,9 @@ def initialize():
 atexit.register(dispose_orm)
 
 
-def is_telemetry_collection_enabled() -> bool:
-"""Check if scarf analytics is enabled."""
-return conf.getboolean("telemetry_collection", "enabled", fallback=True) 
and (
+def is_usage_data_collection_enabled() -> bool:
+"""Check if data collection is enabled."""
+return conf.getboolean("usage_data_collection", "enabled", fallback=True) 
and (
 os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
 )
 
diff --git a/airflow/utils/sca

(airflow) branch main updated: Add timeout when watching pod events in k8s executor (#39551)

2024-05-15 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 610747d25a Add timeout when watching pod events in k8s executor 
(#39551)
610747d25a is described below

commit 610747d25a6153574c07624afaadcbf575aa2960
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed May 15 09:23:23 2024 -0700

Add timeout when watching pod events in k8s executor (#39551)

If we don't set a timeout, it may hang indefinitely if there's a network 
issue.

-

Co-authored-by: Ryan Hatter <25823361+rnh...@users.noreply.github.com>
---
 .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 14 ++
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index d26df876ef..b8235bb5ac 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -98,9 +98,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 kube_client, self.resource_version, self.scheduler_job_id, 
self.kube_config
 )
 except ReadTimeoutError:
-self.log.warning(
-"There was a timeout error accessing the Kube API. 
Retrying request.", exc_info=True
-)
+self.log.info("Kubernetes watch timed out waiting for events. 
Restarting watch.")
 time.sleep(1)
 except Exception:
 self.log.exception("Unknown error in KubernetesJobWatcher. 
Failing")
@@ -141,7 +139,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 ) -> str | None:
 self.log.info("Event: and now my watch begins starting at 
resource_version: %s", resource_version)
 
-kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
+kwargs: dict[str, Any] = {"label_selector": 
f"airflow-worker={scheduler_job_id}"}
 if resource_version:
 kwargs["resource_version"] = resource_version
 if kube_config.kube_client_request_args:
@@ -150,6 +148,14 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 
 last_resource_version: str | None = None
 
+# For info about k8s timeout settings see
+# 
https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md
+# and 
https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339
+client_timeout = 30
+server_conn_timeout = 3600
+kwargs["_request_timeout"] = client_timeout
+kwargs["timeout_seconds"] = server_conn_timeout
+
 for event in self._pod_events(kube_client=kube_client, 
query_kwargs=kwargs):
 task = event["object"]
 self.log.debug("Event: %s had an event of type %s", 
task.metadata.name, event["type"])



(airflow) branch main updated: Remove webserver try_number adjustment (#39623)

2024-05-14 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 029cbaec17 Remove webserver try_number adjustment (#39623)
029cbaec17 is described below

commit 029cbaec174b73370e7c4ef2d7ec76e7be333400
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue May 14 14:34:52 2024 -0700

Remove webserver try_number adjustment (#39623)

Previously we had code to compensate for the fact that we were decrementing 
try_number when deferring or rescheduling.  We can remove this code now.  Just 
missed this in #39336.
---
 airflow/www/utils.py  | 8 +---
 airflow/www/views.py  | 4 ++--
 tests/www/views/test_views_log.py | 4 ++--
 3 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 4235ab597f..53a865c3ca 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -96,12 +96,6 @@ def get_instance_with_map(task_instance, session):
 return get_mapped_summary(task_instance, mapped_instances)
 
 
-def get_try_count(try_number: int, state: State):
-if state in (TaskInstanceState.DEFERRED, 
TaskInstanceState.UP_FOR_RESCHEDULE):
-return try_number + 1
-return try_number
-
-
 priority: list[None | TaskInstanceState] = [
 TaskInstanceState.FAILED,
 TaskInstanceState.UPSTREAM_FAILED,
@@ -147,7 +141,7 @@ def get_mapped_summary(parent_instance, task_instances):
 "start_date": group_start_date,
 "end_date": group_end_date,
 "mapped_states": mapped_states,
-"try_number": get_try_count(parent_instance.try_number, 
parent_instance.state),
+"try_number": parent_instance.try_number,
 "execution_date": parent_instance.execution_date,
 }
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index fbc5296f73..26c4d8ff1a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -409,7 +409,7 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], 
session: Session) ->
 "queued_dttm": task_instance.queued_dttm,
 "start_date": task_instance.start_date,
 "end_date": task_instance.end_date,
-"try_number": 
wwwutils.get_try_count(task_instance.try_number, task_instance.state),
+"try_number": task_instance.try_number,
 "note": task_instance.note,
 }
 for task_instance in grouped_tis[item.task_id]
@@ -1687,7 +1687,7 @@ class Airflow(AirflowBaseView):
 
 num_logs = 0
 if ti is not None:
-num_logs = wwwutils.get_try_count(ti.try_number, ti.state)
+num_logs = ti.try_number
 logs = [""] * num_logs
 root = request.args.get("root", "")
 return self.render_template(
diff --git a/tests/www/views/test_views_log.py 
b/tests/www/views/test_views_log.py
index e32eb6654b..2607317c5f 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -208,8 +208,8 @@ def log_admin_client(log_app):
 [
 (None, 0, 0),
 (TaskInstanceState.UP_FOR_RETRY, 2, 2),
-(TaskInstanceState.UP_FOR_RESCHEDULE, 0, 1),
-(TaskInstanceState.UP_FOR_RESCHEDULE, 1, 2),
+(TaskInstanceState.UP_FOR_RESCHEDULE, 0, 0),
+(TaskInstanceState.UP_FOR_RESCHEDULE, 1, 1),
 (TaskInstanceState.RUNNING, 1, 1),
 (TaskInstanceState.SUCCESS, 1, 1),
 (TaskInstanceState.FAILED, 3, 3),



(airflow) branch main updated (975337a50a -> 05b6b741cb)

2024-05-13 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 975337a50a Skip get_current_task_instance_session if use_internal_api 
(#39595)
 add 05b6b741cb Use TracebackSession when internal API configured and 
getting current session (#39600)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/task_instance_session.py | 2 ++
 1 file changed, 2 insertions(+)



(airflow) branch main updated: Skip get_current_task_instance_session if use_internal_api (#39595)

2024-05-13 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 975337a50a Skip get_current_task_instance_session if use_internal_api 
(#39595)
975337a50a is described below

commit 975337a50a4f64a1adfb6cebf45133006d4f3bcb
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon May 13 20:47:23 2024 -0700

Skip get_current_task_instance_session if use_internal_api (#39595)

We don't need a session when configured to use the internal API.
---
 airflow/utils/task_instance_session.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/airflow/utils/task_instance_session.py 
b/airflow/utils/task_instance_session.py
index bb9741bf52..e434c4d3ec 100644
--- a/airflow/utils/task_instance_session.py
+++ b/airflow/utils/task_instance_session.py
@@ -23,6 +23,7 @@ import traceback
 from typing import TYPE_CHECKING
 
 from airflow import settings
+from airflow.api_internal.internal_api_call import InternalApiConfig
 
 if TYPE_CHECKING:
 from sqlalchemy.orm import Session
@@ -35,6 +36,8 @@ log = logging.getLogger(__name__)
 def get_current_task_instance_session() -> Session:
 global __current_task_instance_session
 if not __current_task_instance_session:
+if InternalApiConfig.get_use_internal_api():
+return __current_task_instance_session
 log.warning("No task session set for this task. Continuing but this 
likely causes a resource leak.")
 log.warning("Please report this and stacktrace below to 
https://github.com/apache/airflow/issues;)
 for filename, line_number, name, line in traceback.extract_stack():



(airflow) branch main updated: Scheduler to handle incrementing of try_number (#39336)

2024-05-08 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 3938f71dfa Scheduler to handle incrementing of try_number (#39336)
3938f71dfa is described below

commit 3938f71dfae21c84a3518625543a28ad02edf641
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed May 8 19:56:55 2024 -0700

Scheduler to handle incrementing of try_number (#39336)

Previously, there was a lot of bad stuff happening around try_number.

We incremented it when task started running. And because of that, we had 
this logic to return "_try_number + 1" when task not running. But this gave the 
"right" try number before it ran, and the wrong number after it ran. And, since 
it was naively incremented when task starts running -- i.e. without regard to 
why it is running -- we decremented it when deferring or exiting on a 
reschedule.

What I do here is try to remove all of that stuff:

no more private _try_number attr
no more getter logic
no more decrementing
no more incrementing as part of task execution
Now what we do is increment only when the task is set to scheduled and only 
when it's not coming out of deferral or "up_for_reschedule". So the try_number 
will be more stable. It will not change throughout the course of task 
execution. The only time it will be incremented is when there's legitimately a 
new try.

One consequence of this is that try number will no longer be incremented if 
you run either airlfow tasks run or ti.run() in isolation. But because airflow 
assumes that all tasks runs are scheduled by the scheduler, I do not regard 
this to be a breaking change.

If user code or provider code has implemented hacks to get the "right" 
try_number when looking at it at the wrong time (because previously it gave the 
wrong answer), unfortunately that code will just have to be patched. There are 
only two cases I know of in the providers codebase -- openlineage listener, and 
dbt openlineage.

As a courtesy for backcompat we also add property _try_number which is just 
a proxy for try_number, so you'll still be able to access this attr. But, it 
will not behave the same as it did before.

-

Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
---
 airflow/api/common/mark_tasks.py   |   4 -
 .../api_connexion/schemas/task_instance_schema.py  |   2 +-
 .../plugins/decreasing_priority_weight_strategy.py |   2 +-
 airflow/jobs/backfill_job_runner.py|  30 +++-
 airflow/models/dag.py  |   2 +
 airflow/models/dagrun.py   |  16 +-
 airflow/models/taskinstance.py | 146 +---
 airflow/models/taskinstancekey.py  |   1 +
 .../amazon/aws/executors/ecs/ecs_executor.py   |   2 +-
 airflow/providers/dbt/cloud/CHANGELOG.rst  |   5 +
 airflow/providers/dbt/cloud/utils/openlineage.py   |  14 +-
 .../providers/elasticsearch/log/es_task_handler.py |   2 +-
 airflow/providers/openlineage/CHANGELOG.rst|   5 +
 airflow/providers/openlineage/plugins/listener.py  |  13 +-
 airflow/sensors/base.py|   4 +-
 airflow/utils/log/file_task_handler.py |   8 +-
 airflow/www/utils.py   |   2 +-
 airflow/www/views.py   |  12 +-
 kubernetes_tests/test_kubernetes_pod_operator.py   |   1 +
 newsfragments/39336.significant.rst|   7 +
 tests/cli/commands/test_task_command.py|  31 +++-
 tests/core/test_sentry.py  |   4 +-
 tests/jobs/test_backfill_job.py|  80 +
 tests/jobs/test_scheduler_job.py   |   5 +-
 tests/jobs/test_triggerer_job.py   |   2 +-
 tests/models/test_cleartasks.py| 131 +-
 tests/models/test_dag.py   |  16 +-
 tests/models/test_taskinstance.py  | 189 +++--
 tests/plugins/priority_weight_strategy.py  |   2 +-
 .../amazon/aws/executors/ecs/test_ecs_executor.py  |   2 +-
 .../celery/executors/test_celery_executor.py   |   6 +-
 .../cncf/kubernetes/operators/test_pod.py  |  14 +-
 .../cncf/kubernetes/test_template_rendering.py |   4 +-
 .../providers/openlineage/plugins/test_listener.py |  45 ++---
 tests/providers/smtp/notifications/test_smtp.py|   2 +-
 tests/sensors/test_base.py |  54 --
 tests/test_utils/mock_executor.py  |   1 -
 tests/utils/test_log_handlers.py   |   6 +-
 tests/www/views/test_views_log.py   

(airflow) branch main updated (0f9ad981c3 -> fb169536a3)

2024-05-06 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 0f9ad981c3 Update note about restarting triggerer proccess (#39436)
 add fb169536a3 Only heartbeat if necessary in backfill loop (#39399)

No new revisions were added by this update.

Summary of changes:
 airflow/jobs/backfill_job_runner.py | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)



(airflow) branch main updated (413698ed1d -> 29a9a24a5f)

2024-05-01 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 413698ed1d Use correct prefixes for distribution packages in provider 
documentation (#39341)
 add 29a9a24a5f Refactor tests re sensor reschedule mode and try_number 
(#39351)

No new revisions were added by this update.

Summary of changes:
 tests/sensors/test_base.py | 246 -
 1 file changed, 132 insertions(+), 114 deletions(-)



(airflow) branch main updated (6112745b8f -> 4fa9fe119e)

2024-04-29 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 6112745b8f Allowing tasks to start execution directly from triggerer 
without going to worker (#38674)
 add 4fa9fe119e Rename "try_number" increments that are unrelated to the 
airflow concept (#39317)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/amazon/aws/hooks/emr.py | 14 --
 airflow/sensors/base.py   | 14 +++---
 2 files changed, 15 insertions(+), 13 deletions(-)



(airflow) branch main updated (eba12409a7 -> b365cbd8ea)

2024-04-23 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from eba12409a7 Add support to search by dag_display_name. (#39008)
 add b365cbd8ea Provide mechanism to serialize airflow exceptions (#39187)

No new revisions were added by this update.

Summary of changes:
 airflow/exceptions.py  | 15 ++
 airflow/serialization/enums.py |  1 +
 airflow/serialization/serialized_objects.py| 20 -
 tests/serialization/test_serialized_objects.py | 40 +-
 4 files changed, 74 insertions(+), 2 deletions(-)



(airflow) branch main updated (f69f9fb6ed -> e76031ba99)

2024-04-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from f69f9fb6ed Generalize exception error_id tracing in RPC server (#39186)
 add e76031ba99 Provide mechanism to serialize airflow BaseTrigger object 
(#39185)

No new revisions were added by this update.

Summary of changes:
 airflow/serialization/enums.py |  1 +
 airflow/serialization/serialized_objects.py| 10 ++
 tests/serialization/test_serialized_objects.py |  6 +-
 3 files changed, 16 insertions(+), 1 deletion(-)



(airflow) branch main updated: Generalize exception error_id tracing in RPC server (#39186)

2024-04-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new f69f9fb6ed Generalize exception error_id tracing in RPC server (#39186)
f69f9fb6ed is described below

commit f69f9fb6ed40175dd8a21b4fd007aa88012e511f
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Apr 22 22:29:59 2024 -0700

Generalize exception error_id tracing in RPC server (#39186)

There were other ways in which the RPC server could fail and we should 
provide a mechanism to find those logs too.
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py | 30 ++
 .../endpoints/test_rpc_api_endpoint.py |  4 +--
 2 files changed, 16 insertions(+), 18 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index f2fa53d3ee..c5089718d6 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -107,19 +107,25 @@ def _initialize_map() -> dict[str, Callable]:
 return {f"{func.__module__}.{func.__qualname__}": func for func in 
functions}
 
 
+def log_and_build_error_response(message, status):
+error_id = uuid4()
+server_message = message + f" error_id={error_id}"
+log.exception(server_message)
+client_message = message + f" The server side traceback may be identified 
with error_id={error_id}"
+return Response(response=client_message, status=status)
+
+
 def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
 """Handle Internal API /internal_api/v1/rpcapi endpoint."""
 log.debug("Got request")
 json_rpc = body.get("jsonrpc")
 if json_rpc != "2.0":
-log.error("Not jsonrpc-2.0 request.")
-return Response(response="Expected jsonrpc 2.0 request.", status=400)
+return log_and_build_error_response(message="Expected jsonrpc 2.0 
request.", status=400)
 
 methods_map = _initialize_map()
 method_name = body.get("method")
 if method_name not in methods_map:
-log.error("Unrecognized method: %s.", method_name)
-return Response(response=f"Unrecognized method: {method_name}.", 
status=400)
+return log_and_build_error_response(message=f"Unrecognized method: 
{method_name}.", status=400)
 
 handler = methods_map[method_name]
 params = {}
@@ -127,12 +133,10 @@ def internal_airflow_api(body: dict[str, Any]) -> 
APIResponse:
 if body.get("params"):
 params_json = body.get("params")
 params = BaseSerialization.deserialize(params_json, 
use_pydantic_models=True)
-except Exception as e:
-log.error("Error when deserializing parameters for method: %s.", 
method_name)
-log.exception(e)
-return Response(response="Error deserializing parameters.", status=400)
+except Exception:
+return log_and_build_error_response(message="Error deserializing 
parameters.", status=400)
 
-log.debug("Calling method %s.", method_name)
+log.debug("Calling method %s\nparams: %s", method_name, params)
 try:
 # Session must be created there as it may be needed by serializer for 
lazy-loaded fields.
 with create_session() as session:
@@ -141,10 +145,4 @@ def internal_airflow_api(body: dict[str, Any]) -> 
APIResponse:
 response = json.dumps(output_json) if output_json is not None else 
None
 return Response(response=response, headers={"Content-Type": 
"application/json"})
 except Exception:
-error_id = uuid4()
-log.exception("Error executing method '%s'; error_id=%s.", 
method_name, error_id)
-return Response(
-response=f"Error executing method '{method_name}'. "
-f"The server side traceback may be identified with 
error_id={error_id}",
-status=500,
-)
+return log_and_build_error_response(message=f"Error executing method 
'{method_name}'.", status=500)
diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py 
b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
index afa3bc9920..4c312da3a7 100644
--- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
+++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
@@ -149,7 +149,7 @@ class TestRpcApiEndpoint:
 "/internal_api/v1/rpcapi", headers={"Content-Type": 
"application/json"}, data=json.dumps(data)
 )
 assert response.status_code == 400
-assert

(airflow) branch main updated (bcbcb8e39c -> acc2338aed)

2024-04-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from bcbcb8e39c Fix docs on what KE pod_override can override (#39169)
 add acc2338aed Fix stacklevel for TaskContextLogger (#39142)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/log/task_context_logger.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(airflow) branch main updated: Use debug level for minischeduler skip (#38976)

2024-04-12 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d03ba594b3 Use debug level for minischeduler skip (#38976)
d03ba594b3 is described below

commit d03ba594b3158c127c1c1b3f1d0c31fb93104367
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Apr 12 16:04:16 2024 -0700

Use debug level for minischeduler skip (#38976)

Now that we are using nowait, it will be more common that the minicheduler 
skips.  Leaving it at info level will cause unnecessary alarm to users.
---
 airflow/models/taskinstance.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1d0319c0cd..2b8b935d78 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3563,7 +3563,7 @@ class TaskInstance(Base, LoggingMixin):
 
 except OperationalError as e:
 # Any kind of DB error here is _non fatal_ as this block is just 
an optimisation.
-cls.logger().info(
+cls.logger().debug(
 "Skipping mini scheduling run due to exception: %s",
 e.statement,
 exc_info=True,



(airflow) branch main updated: Make default_action_log an RPC function (#38946)

2024-04-12 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new c450cbfbf9 Make default_action_log an RPC function (#38946)
c450cbfbf9 is described below

commit c450cbfbf9e1b2339ae2329b2898b92c6beda527
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Apr 12 14:43:07 2024 -0700

Make default_action_log an RPC function (#38946)

To use RPC, we need to accept a session, which is provided by the RPC call 
handler. But, the action log callback system may already be forwarding a 
session, so to avoid a collision, I have made this internal function instead of 
making default_action_log an RPC function.
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  4 +-
 airflow/utils/cli_action_loggers.py| 80 --
 2 files changed, 62 insertions(+), 22 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index c428e8e481..b5fd545066 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -47,12 +47,14 @@ def _initialize_map() -> dict[str, Callable]:
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.secrets.metastore import MetastoreBackend
+from airflow.utils.cli_action_loggers import _default_action_log_internal
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 functions: list[Callable] = [
+_default_action_log_internal,
 _get_template_context,
-_update_rtif,
 _get_ti_db_access,
+_update_rtif,
 DagFileProcessor.update_import_errors,
 DagFileProcessor.manage_slas,
 DagFileProcessorManager.deactivate_stale_dags,
diff --git a/airflow/utils/cli_action_loggers.py 
b/airflow/utils/cli_action_loggers.py
index da56f7db7e..7ac2442a04 100644
--- a/airflow/utils/cli_action_loggers.py
+++ b/airflow/utils/cli_action_loggers.py
@@ -26,7 +26,13 @@ from __future__ import annotations
 
 import json
 import logging
-from typing import Callable
+from typing import TYPE_CHECKING, Callable
+
+from airflow.api_internal.internal_api_call import internal_api_call
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+from sqlalchemy.orm import Session
 
 logger = logging.getLogger(__name__)
 
@@ -100,32 +106,62 @@ def default_action_log(sub_command, user, task_id, 
dag_id, execution_date, host_
 The difference is this function uses the global ORM session, and pushes a
 ``Log`` row into the database instead of actually logging.
 """
+_default_action_log_internal(
+sub_command=sub_command,
+user=user,
+task_id=task_id,
+dag_id=dag_id,
+execution_date=execution_date,
+host_name=host_name,
+full_command=full_command,
+)
+
+
+@internal_api_call
+@provide_session
+def _default_action_log_internal(
+*,
+sub_command,
+user,
+task_id,
+dag_id,
+execution_date,
+host_name,
+full_command,
+session: Session = NEW_SESSION,
+):
+"""
+RPC portion of default_action_log.
+
+To use RPC, we need to accept a session, which is provided by the RPC call 
handler.
+But, the action log callback system may already be forwarding a session, 
so to avoid
+a collision, I have made this internal function instead of making 
default_action_log
+an RPC function.
+"""
 from sqlalchemy.exc import OperationalError, ProgrammingError
 
 from airflow.models.log import Log
 from airflow.utils import timezone
-from airflow.utils.session import create_session
 
 try:
-with create_session() as session:
-extra = json.dumps({"host_name": host_name, "full_command": 
full_command})
-# Use bulk_insert_mappings here to avoid importing all models 
(which using the classes does) early
-# on in the CLI
-session.bulk_insert_mappings(
-Log,
-[
-{
-"event": f"cli_{sub_command}",
-"task_instance": None,
-"owner": user,
-"extra": extra,
-"task_id": task_id,
-"dag_id": dag_id,
-"execution_date": execution_date,
-"dttm": timezone.utcnow(),
-}
-],
-)
+# Use bulk_inser

(airflow) branch main updated: Fix try_number handling when db isolation enabled (#38943)

2024-04-12 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d39367bcbf Fix try_number handling when db isolation enabled (#38943)
d39367bcbf is described below

commit d39367bcbfb1117236f23caa12c28d19daa970c9
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Apr 12 09:17:19 2024 -0700

Fix try_number handling when db isolation enabled (#38943)

There was an error in the refresh_from_db code, and because of try_number 
inconsistency, the same run was going into two different log files.  There is 
some ugliness here, but some ugliness is unavoidable when dealing with 
try_number as it is right now.
---
 airflow/models/dagrun.py   |  1 -
 airflow/models/taskinstance.py | 27 --
 airflow/serialization/pydantic/taskinstance.py |  1 -
 airflow/utils/log/file_task_handler.py | 13 -
 tests/models/test_taskinstance.py  | 15 +-
 5 files changed, 47 insertions(+), 10 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 20d92cfa95..fb7a2ae6cd 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -650,7 +650,6 @@ class DagRun(Base, LoggingMixin):
 )
 
 @staticmethod
-@internal_api_call
 @provide_session
 def fetch_task_instance(
 dag_id: str,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b07aed936d..43b388ef68 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -539,7 +539,7 @@ def _refresh_from_db(
 task_instance.end_date = ti.end_date
 task_instance.duration = ti.duration
 task_instance.state = ti.state
-task_instance.try_number = ti._try_number  # private attr to get value 
unaltered by accessor
+task_instance.try_number = _get_private_try_number(task_instance=ti)
 task_instance.max_tries = ti.max_tries
 task_instance.hostname = ti.hostname
 task_instance.unixname = ti.unixname
@@ -925,7 +925,7 @@ def _handle_failure(
 TaskInstance.save_to_db(failure_context["ti"], session)
 
 
-def _get_try_number(*, task_instance: TaskInstance | TaskInstancePydantic):
+def _get_try_number(*, task_instance: TaskInstance):
 """
 Return the try number that a task number will be when it is actually run.
 
@@ -943,6 +943,23 @@ def _get_try_number(*, task_instance: TaskInstance | 
TaskInstancePydantic):
 return task_instance._try_number + 1
 
 
+def _get_private_try_number(*, task_instance: TaskInstance | 
TaskInstancePydantic):
+"""
+Opposite of _get_try_number.
+
+Given the value returned by try_number, return the value of _try_number 
that
+should produce the same result.
+This is needed for setting _try_number on TaskInstance from the value on 
PydanticTaskInstance, which has no private attrs.
+
+:param task_instance: the task instance
+
+:meta private:
+"""
+if task_instance.state == TaskInstanceState.RUNNING:
+return task_instance.try_number
+return task_instance.try_number - 1
+
+
 def _set_try_number(*, task_instance: TaskInstance | TaskInstancePydantic, 
value: int) -> None:
 """
 Set a task try number.
@@ -3000,6 +3017,12 @@ class TaskInstance(Base, LoggingMixin):
 _stop_remaining_tasks(task_instance=ti, session=session)
 else:
 if ti.state == TaskInstanceState.QUEUED:
+from airflow.serialization.pydantic.taskinstance import 
TaskInstancePydantic
+
+if isinstance(ti, TaskInstancePydantic):
+# todo: (AIP-44) we should probably "coalesce" `ti` to 
TaskInstance before here
+#  e.g. we could make refresh_from_db return a TI and 
replace ti with that
+raise RuntimeError("Expected TaskInstance here. Further 
AIP-44 work required.")
 # We increase the try_number to fail the task if it fails to 
start after sometime
 ti._try_number += 1
 ti.state = State.UP_FOR_RETRY
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index cf27d755b5..cc52aa9989 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -85,7 +85,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
 duration: Optional[float]
 state: Optional[str]
 try_number: int
-_try_number: int
 max_tries: int
 hostname: str
 unixname: str
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index

(airflow) branch main updated (afe8e82531 -> b90fa78e88)

2024-04-10 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from afe8e82531 Don't wait for DagRun lock in mini scheduler (#38914)
 add b90fa78e88 Fix check of correct dag when remote call for _get_ti 
(#38909)

No new revisions were added by this update.

Summary of changes:
 airflow/cli/commands/task_command.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)



(airflow) branch main updated: Don't wait for DagRun lock in mini scheduler (#38914)

2024-04-10 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new afe8e82531 Don't wait for DagRun lock in mini scheduler (#38914)
afe8e82531 is described below

commit afe8e82531cece4f3aeaea465b985ebd7257e71d
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Apr 10 20:06:37 2024 -0700

Don't wait for DagRun lock in mini scheduler (#38914)

We should just bail if something else is already "minischeduling".  We 
already catch OperationalError so there's nothing else we need to change for 
this.

Sometimes (e.g. with certain task mapping scenarios) many tasks from same 
dag run are trying to "minischedule" at the same time.  If something else is 
already locking, it's probably better to just move on and not wait.
---
 airflow/models/taskinstance.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a55ea0fe77..c3bf25e343 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3492,6 +3492,7 @@ class TaskInstance(Base, LoggingMixin):
 run_id=ti.run_id,
 ),
 session=session,
+nowait=True,
 ).one()
 
 task = ti.task



(airflow) branch main updated: Add retry logic for RPC calls (#38910)

2024-04-10 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new cdc7f19b57 Add retry logic for RPC calls (#38910)
cdc7f19b57 is described below

commit cdc7f19b571a99cbbad5091dcb11e2d4f1439fb3
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Apr 10 19:47:20 2024 -0700

Add retry logic for RPC calls (#38910)

I have found that when RPC server restarts it can take 30-60s for the 
server to be able to respond to RPC calls. This implements exponential wait for 
that case. 10 might seem excessive but I found that 3 or 5 didn't always do the 
trick.
---
 airflow/api_internal/internal_api_call.py | 14 +-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/airflow/api_internal/internal_api_call.py 
b/airflow/api_internal/internal_api_call.py
index 8dcd5dba30..c3a67d03ee 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -19,10 +19,13 @@ from __future__ import annotations
 
 import inspect
 import json
+import logging
 from functools import wraps
 from typing import Callable, TypeVar
 
 import requests
+import tenacity
+from urllib3.exceptions import NewConnectionError
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, AirflowException
@@ -32,6 +35,8 @@ from airflow.typing_compat import ParamSpec
 PS = ParamSpec("PS")
 RT = TypeVar("RT")
 
+logger = logging.getLogger(__name__)
+
 
 class InternalApiConfig:
 """Stores and caches configuration for Internal API."""
@@ -96,7 +101,14 @@ def internal_api_call(func: Callable[PS, RT]) -> 
Callable[PS, RT]:
 headers = {
 "Content-Type": "application/json",
 }
-
+from requests.exceptions import ConnectionError
+
+@tenacity.retry(
+stop=tenacity.stop_after_attempt(10),
+wait=tenacity.wait_exponential(min=1),
+retry=tenacity.retry_if_exception_type((NewConnectionError, 
ConnectionError)),
+before_sleep=tenacity.before_log(logger, logging.WARNING),
+)
 def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
 data = {"jsonrpc": "2.0", "method": method_name, "params": params_json}
 internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()



(airflow) branch main updated: Make _get_ti compatible with RPC (#38570)

2024-04-09 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 0af5d923d9 Make _get_ti compatible with RPC (#38570)
0af5d923d9 is described below

commit 0af5d923d99591576b3758ab3c694d02dbe152bf
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 9 16:34:03 2024 -0700

Make _get_ti compatible with RPC (#38570)

This is for AIP-44.  I had to pull out the "db access" parts from `_get_ti` 
and move them to RPC function `_get_ti_db_access`.  To make that work, I also 
had to ensure that "task" objects (a.k.a. instances of AbstractOperator) can 
properly be roundtripped with BaseSerialization.serialize.  Up to now they 
could not be, and they were "manually" serialized as part of SerializedDAG.  
This changes a bit the way we serialize task objects and so we had to handle 
backcompat and update a fair  [...]
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |   2 +
 airflow/cli/commands/task_command.py   |  39 -
 airflow/models/taskinstance.py |   1 -
 airflow/serialization/serialized_objects.py|  14 +-
 tests/providers/amazon/aws/links/test_base_aws.py  |   2 +-
 .../amazon/aws/operators/test_emr_serverless.py|  16 +-
 .../google/cloud/operators/test_bigquery.py|   8 +-
 .../google/cloud/operators/test_dataproc.py|  14 +-
 tests/serialization/test_dag_serialization.py  | 184 +++--
 9 files changed, 163 insertions(+), 117 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index 97449810bd..c428e8e481 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -37,6 +37,7 @@ log = logging.getLogger(__name__)
 
 @functools.lru_cache
 def _initialize_map() -> dict[str, Callable]:
+from airflow.cli.commands.task_command import _get_ti_db_access
 from airflow.dag_processing.manager import DagFileProcessorManager
 from airflow.dag_processing.processor import DagFileProcessor
 from airflow.models import Trigger, Variable, XCom
@@ -51,6 +52,7 @@ def _initialize_map() -> dict[str, Callable]:
 functions: list[Callable] = [
 _get_template_context,
 _update_rtif,
+_get_ti_db_access,
 DagFileProcessor.update_import_errors,
 DagFileProcessor.manage_slas,
 DagFileProcessorManager.deactivate_stale_dags,
diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index 05adb0abda..23a1ed460e 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -34,7 +34,7 @@ from pendulum.parsing.exceptions import ParserError
 from sqlalchemy import select
 
 from airflow import settings
-from airflow.api_internal.internal_api_call import InternalApiConfig
+from airflow.api_internal.internal_api_call import InternalApiConfig, 
internal_api_call
 from airflow.cli.simple_table import AirflowConsole
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, 
TaskInstanceNotFound
@@ -156,8 +156,10 @@ def _get_dag_run(
 raise ValueError(f"unknown create_if_necessary value: 
{create_if_necessary!r}")
 
 
+@internal_api_call
 @provide_session
-def _get_ti(
+def _get_ti_db_access(
+dag: DAG,
 task: Operator,
 map_index: int,
 *,
@@ -167,9 +169,9 @@ def _get_ti(
 session: Session = NEW_SESSION,
 ) -> tuple[TaskInstance | TaskInstancePydantic, bool]:
 """Get the task instance through DagRun.run_id, if that fails, get the TI 
the old way."""
-dag = task.dag
-if dag is None:
-raise ValueError("Cannot get task instance for a task not assigned to 
a DAG")
+if task.dag_id != dag.dag_id:
+raise ValueError(f"Provided task '{task.task_id}' is not assigned to 
provided dag {dag.dag_id}.")
+
 if not exec_date_or_run_id and not create_if_necessary:
 raise ValueError("Must provide `exec_date_or_run_id` if not 
`create_if_necessary`.")
 if needs_expansion(task):
@@ -201,6 +203,33 @@ def _get_ti(
 return ti, dr_created
 
 
+def _get_ti(
+task: Operator,
+map_index: int,
+*,
+exec_date_or_run_id: str | None = None,
+pool: str | None = None,
+create_if_necessary: CreateIfNecessary = False,
+):
+dag = task.dag
+if dag is None:
+raise ValueError("Cannot get task instance for a task not assigned to 
a DAG")
+
+ti, dr_created = _get_ti_db_access(
+dag=dag,
+task=task,
+map_index=map_index,
+exec_date_or_run_id=exec_date_or_run_id,
+   

(airflow) branch main updated: Fix test side effects from TracebackSession (#38885)

2024-04-09 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 4f706d87fc Fix test side effects from TracebackSession (#38885)
4f706d87fc is described below

commit 4f706d87fc83683e9bb958674626920f2cd41a14
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 9 16:26:03 2024 -0700

Fix test side effects from TracebackSession (#38885)

---

Co-authored-by: Andrey Anshin 
---
 tests/core/test_settings.py | 35 +--
 1 file changed, 17 insertions(+), 18 deletions(-)

diff --git a/tests/core/test_settings.py b/tests/core/test_settings.py
index abaf4a907b..5eac456103 100644
--- a/tests/core/test_settings.py
+++ b/tests/core/test_settings.py
@@ -28,7 +28,7 @@ import pytest
 
 from airflow.api_internal.internal_api_call import InternalApiConfig
 from airflow.exceptions import AirflowClusterPolicyViolation, 
AirflowConfigException
-from airflow.settings import _ENABLE_AIP_44, TracebackSession, configure_orm
+from airflow.settings import _ENABLE_AIP_44, TracebackSession
 from airflow.utils.session import create_session
 from tests.test_utils.config import conf_vars
 
@@ -65,6 +65,16 @@ def task_must_have_owners(task: BaseOperator):
 """
 
 
+@pytest.fixture
+def clear_internal_api():
+try:
+yield
+finally:
+InternalApiConfig._initialized = False
+InternalApiConfig._use_internal_api = None
+InternalApiConfig._internal_api_endpoint = None
+
+
 class SettingsContext:
 def __init__(self, content: str, module_name: str):
 self.content = content
@@ -276,27 +286,22 @@ class TestEngineArgs:
 ("core", "internal_api_url"): "http://localhost:;,
 }
 )
-def test_get_traceback_session_if_aip_44_enabled():
+def test_get_traceback_session_if_aip_44_enabled(clear_internal_api):
 # ensure we take the database_access_isolation config
 InternalApiConfig._init_values()
 assert InternalApiConfig.get_use_internal_api() is True
 
-# ensure that the Session object is TracebackSession
-configure_orm()
-
-from airflow.settings import Session
-
-assert Session == TracebackSession
-
-# no error to create
 with create_session() as session:
 assert isinstance(session, TracebackSession)
 
+# no error just to create the "session"
+# but below, when we try to use, it will raise
+
 with pytest.raises(
 RuntimeError,
 match="TracebackSession object was used but internal API is 
enabled.",
 ):
-session.hi()
+session.execute()
 
 
 @pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
@@ -307,21 +312,15 @@ def test_get_traceback_session_if_aip_44_enabled():
 }
 )
 @patch("airflow.utils.session.TracebackSession.__new__")
-def test_create_session_ctx_mgr_no_call_methods(mock_new):
+def test_create_session_ctx_mgr_no_call_methods(mock_new, clear_internal_api):
 m = MagicMock()
 mock_new.return_value = m
 # ensure we take the database_access_isolation config
 InternalApiConfig._init_values()
 assert InternalApiConfig.get_use_internal_api() is True
 
-# ensure that the Session object is TracebackSession
-configure_orm()
-
-# no error to create
 with create_session() as session:
 assert isinstance(session, MagicMock)
 assert session == m
 method_calls = [x[0] for x in m.method_calls]
 assert method_calls == []  # commit and close not called when using 
internal API
-
-# assert mock_session_obj.call_args_list == []



(airflow) branch main updated: Add session that blows up when using internal API (#38563)

2024-04-09 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 04c2ab5be6 Add session that blows up when using internal API (#38563)
04c2ab5be6 is described below

commit 04c2ab5be669550e4c4d1d004ed1fd1461e58f7e
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 9 07:55:24 2024 -0700

Add session that blows up when using internal API (#38563)

Here I add a TracebackSession which is designed to blow up the first time 
it's used and then provide a traceback for the original call site.  I also 
ensure that this is the Session class that is used when internal API config is 
enabled.
---
 airflow/settings.py | 39 +++--
 airflow/utils/session.py|  5 
 tests/conftest.py   |  2 --
 tests/core/test_settings.py | 61 +
 4 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/airflow/settings.py b/airflow/settings.py
index 7ead89f34e..fa859fdf42 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -23,6 +23,7 @@ import json
 import logging
 import os
 import sys
+import traceback
 import warnings
 from typing import TYPE_CHECKING, Any, Callable
 
@@ -207,7 +208,11 @@ def configure_vars():
 
 
 class SkipDBTestsSession:
-"""This fake session is used to skip DB tests when 
`_AIRFLOW_SKIP_DB_TESTS` is set."""
+"""
+This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` 
is set.
+
+:meta private:
+"""
 
 def __init__(self):
 raise AirflowInternalRuntimeError(
@@ -222,6 +227,30 @@ class SkipDBTestsSession:
 pass
 
 
+class TracebackSession:
+"""
+Session that throws error when you try to use it.
+
+Also stores stack at instantiation call site.
+
+:meta private:
+"""
+
+def __init__(self):
+self.traceback = traceback.extract_stack()
+
+def __getattr__(self, item):
+raise RuntimeError(
+"TracebackSession object was used but internal API is enabled. "
+"You'll need to ensure you are making only RPC calls with this 
object. "
+"The stack list below will show where the TracebackSession object 
was created."
++ "\n".join(traceback.format_list(self.traceback))
+)
+
+def remove(*args, **kwargs):
+pass
+
+
 def configure_orm(disable_connection_pool=False, pool_class=None):
 """Configure ORM using SQLAlchemy."""
 from airflow.utils.log.secrets_masker import mask_secret
@@ -242,7 +271,13 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
 
 global Session
 global engine
-if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
+from airflow.api_internal.internal_api_call import InternalApiConfig
+
+if InternalApiConfig.get_use_internal_api():
+Session = TracebackSession
+engine = None
+return
+elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
 # Skip DB initialization in unit tests, if DB tests are skipped
 Session = SkipDBTestsSession
 engine = None
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index b3b610d199..2268b5fb61 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -24,12 +24,17 @@ from typing import Callable, Generator, TypeVar, cast
 from sqlalchemy.orm import Session as SASession
 
 from airflow import settings
+from airflow.api_internal.internal_api_call import InternalApiConfig
+from airflow.settings import TracebackSession
 from airflow.typing_compat import ParamSpec
 
 
 @contextlib.contextmanager
 def create_session() -> Generator[SASession, None, None]:
 """Contextmanager that will create and teardown a session."""
+if InternalApiConfig.get_use_internal_api():
+yield TracebackSession()
+return
 Session = getattr(settings, "Session", None)
 if Session is None:
 raise RuntimeError("Session must be set before!")
diff --git a/tests/conftest.py b/tests/conftest.py
index 05bc72668d..6d102e7268 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -89,8 +89,6 @@ if skip_db_tests:
 # Make sure sqlalchemy will not be usable for pure unit tests even if 
initialized
 os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = "bad_schema:///"
 os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "bad_schema:///"
-# Force database isolation mode for pure unit tests
-os.environ["AIRFLOW__CORE__DATABASE_ACCESS_ISOL

(airflow) branch main updated: Ensure internal api command get the "ready" prefix on start (#38550)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new b255012cf0 Ensure internal api command get the "ready" prefix on start 
(#38550)
b255012cf0 is described below

commit b255012cf01ff79357a22a9b5795447c66f2feab
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 2 22:37:17 2024 -0700

Ensure internal api command get the "ready" prefix on start (#38550)

This prefix is used by _get_num_ready_workers_running to determine num 
workers ready.
---
 airflow/api_internal/gunicorn_config.py | 33 +
 airflow/cli/commands/internal_api_command.py|  2 ++
 tests/cli/commands/test_internal_api_command.py |  2 ++
 3 files changed, 37 insertions(+)

diff --git a/airflow/api_internal/gunicorn_config.py 
b/airflow/api_internal/gunicorn_config.py
new file mode 100644
index 00..1ed8e1f941
--- /dev/null
+++ b/airflow/api_internal/gunicorn_config.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import setproctitle
+
+from airflow import settings
+
+
+def post_worker_init(_):
+"""
+Set process title.
+
+This is used by airflow.cli.commands.internal_api_command to track the 
status of the worker.
+"""
+old_title = setproctitle.getproctitle()
+setproctitle.setproctitle(settings.GUNICORN_WORKER_READY_PREFIX + 
old_title)
diff --git a/airflow/cli/commands/internal_api_command.py 
b/airflow/cli/commands/internal_api_command.py
index 00a5c4b840..8c25d1fa5a 100644
--- a/airflow/cli/commands/internal_api_command.py
+++ b/airflow/cli/commands/internal_api_command.py
@@ -115,6 +115,8 @@ def internal_api(args):
 str(access_logfile),
 "--error-logfile",
 str(error_logfile),
+"--config",
+"python:airflow.api_internal.gunicorn_config",
 ]
 
 if args.access_logformat and args.access_logformat.strip():
diff --git a/tests/cli/commands/test_internal_api_command.py 
b/tests/cli/commands/test_internal_api_command.py
index f3940278a4..2c41d16f75 100644
--- a/tests/cli/commands/test_internal_api_command.py
+++ b/tests/cli/commands/test_internal_api_command.py
@@ -205,6 +205,8 @@ class TestCliInternalAPI(_ComonCLIGunicornTestClass):
 "-",
 "--error-logfile",
 "-",
+"--config",
+"python:airflow.api_internal.gunicorn_config",
 "--access-logformat",
 "custom_log_format",
 "airflow.cli.commands.internal_api_command:cached_app()",



(airflow) branch main updated (c4439713cf -> 40dbe4b734)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from c4439713cf Revert "Delete deprecated AutoML operators and deprecate 
AutoML hook and links (#38418)" (#38633)
 add 40dbe4b734 Can update RenderedTaskInstanceFields over RPC (#38565)

No new revisions were added by this update.

Summary of changes:
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  3 ++-
 airflow/models/renderedtifields.py | 21 -
 airflow/models/taskinstance.py | 19 +--
 airflow/serialization/serialized_objects.py|  4 ++--
 4 files changed, 33 insertions(+), 14 deletions(-)



(airflow) branch main updated: Make _get_template_context an RPC call (#38567)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 0010bf1269 Make _get_template_context an RPC call (#38567)
0010bf1269 is described below

commit 0010bf126909a7385b731de80668b91af7cc74e5
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 2 09:25:49 2024 -0700

Make _get_template_context an RPC call (#38567)

Provide way of serializing the template context over RPC
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  2 ++
 airflow/models/taskinstance.py | 23 +-
 airflow/serialization/enums.py |  1 +
 airflow/serialization/serialized_objects.py| 16 ++-
 4 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index 5074504b8d..243fcfa284 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Callable
 from flask import Response
 
 from airflow.jobs.job import Job, most_recent_job
+from airflow.models.taskinstance import _get_template_context
 from airflow.serialization.serialized_objects import BaseSerialization
 from airflow.utils.session import create_session
 
@@ -48,6 +49,7 @@ def _initialize_map() -> dict[str, Callable]:
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 functions: list[Callable] = [
+_get_template_context,
 DagFileProcessor.update_import_errors,
 DagFileProcessor.manage_slas,
 DagFileProcessorManager.deactivate_stale_dags,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 14fc0fc8f7..e7fdc5bec1 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -597,6 +597,7 @@ def _clear_next_method_args(*, task_instance: TaskInstance 
| TaskInstancePydanti
 task_instance.next_kwargs = None
 
 
+@internal_api_call
 def _get_template_context(
 *,
 task_instance: TaskInstance | TaskInstancePydantic,
@@ -623,10 +624,30 @@ def _get_template_context(
 
 task = task_instance.task
 if TYPE_CHECKING:
+assert task_instance.task
 assert task
 assert task.dag
-dag: DAG = task.dag
+try:
+dag: DAG = task.dag
+except AirflowException:
+from airflow.serialization.pydantic.taskinstance import 
TaskInstancePydantic
 
+if isinstance(task_instance, TaskInstancePydantic):
+ti = session.scalar(
+select(TaskInstance).where(
+TaskInstance.task_id == task_instance.task_id,
+TaskInstance.dag_id == task_instance.dag_id,
+TaskInstance.run_id == task_instance.run_id,
+TaskInstance.map_index == task_instance.map_index,
+)
+)
+dag = ti.dag_model.serialized_dag.dag
+if hasattr(task_instance.task, "_dag"):  # BaseOperator
+task_instance.task._dag = dag
+else:  # MappedOperator
+task_instance.task.dag = dag
+else:
+raise
 dag_run = task_instance.get_dagrun(session)
 data_interval = dag.get_run_data_interval(dag_run)
 
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 2a4387eeb4..9b7cdbcc73 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -61,4 +61,5 @@ class DagAttributeTypes(str, Enum):
 DATA_SET = "data_set"
 LOG_TEMPLATE = "log_template"
 CONNECTION = "connection"
+TASK_CONTEXT = "task_context"
 ARG_NOT_SET = "arg_not_set"
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 16a5c9e481..98d3d3a654 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -67,6 +67,7 @@ from airflow.task.priority_strategy import (
 airflow_priority_weight_strategies_classes,
 )
 from airflow.utils.code_utils import get_python_source
+from airflow.utils.context import Context
 from airflow.utils.docs import get_docs_url
 from airflow.utils.module_loading import import_string, qualname
 from airflow.utils.operator_resources import Resources
@@ -602,6 +603,12 @@ class BaseSerialization:
 )
 elif isinstance(var, Connection):
 return cls._encode(var.to_dict(validate=True), 
type_=DAT.CONNECTION)
+elif var.__class__ == Context:
+d = {}
+for k, v in var._context.items():
+obj = cls.serialize(v, strict=strict, 
use_

(airflow) branch main updated: Remove select_column option in TaskInstance.get_task_instance (#38571)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 583fa2da38 Remove select_column option in 
TaskInstance.get_task_instance (#38571)
583fa2da38 is described below

commit 583fa2da387ef08ce3ff999dea9e6e61524b0cb7
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 2 09:16:58 2024 -0700

Remove select_column option in TaskInstance.get_task_instance (#38571)

Fundamentally what's going on here is we need a TaskInstance object instead 
of a Row object when sending over the wire in RPC call.  But the full story on 
this one is actually somewhat complicated.
It was back in 2.2.0 in #25312 when we converted to query with the column 
attrs instead of the TI object (#28900 only refactored this logic into a 
function).  The reason was to avoid locking the dag_run table since TI newly 
had a dag_run relationship attr.  Now, this causes a problem with AIP-44 
because the RPC api does not know how to serialize a Row object.
This PR switches back to querying a TaskInstance object, but avoids locking 
dag_run by using lazy_load option.  Meanwhile, since try_number is a horrible 
attribute (which gives you a different answer depending on the state), we have 
to switch it back to look at the underlying private attr instead of the public 
accesor.
---
 airflow/models/taskinstance.py| 24 +++-
 tests/models/test_taskinstance.py | 13 +
 2 files changed, 24 insertions(+), 13 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2107781041..14fc0fc8f7 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -61,7 +61,7 @@ from sqlalchemy import (
 )
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.ext.mutable import MutableDict
-from sqlalchemy.orm import reconstructor, relationship
+from sqlalchemy.orm import lazyload, reconstructor, relationship
 from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
 from sqlalchemy.sql.expression import case, select
 
@@ -523,7 +523,6 @@ def _refresh_from_db(
 task_id=task_instance.task_id,
 run_id=task_instance.run_id,
 map_index=task_instance.map_index,
-select_columns=True,
 lock_for_update=lock_for_update,
 session=session,
 )
@@ -534,8 +533,7 @@ def _refresh_from_db(
 task_instance.end_date = ti.end_date
 task_instance.duration = ti.duration
 task_instance.state = ti.state
-# Since we selected columns, not the object, this is the raw value
-task_instance.try_number = ti.try_number
+task_instance.try_number = ti._try_number  # private attr to get value 
unaltered by accessor
 task_instance.max_tries = ti.max_tries
 task_instance.hostname = ti.hostname
 task_instance.unixname = ti.unixname
@@ -914,7 +912,7 @@ def _get_try_number(*, task_instance: TaskInstance | 
TaskInstancePydantic):
 
 :meta private:
 """
-if task_instance.state == TaskInstanceState.RUNNING.RUNNING:
+if task_instance.state == TaskInstanceState.RUNNING:
 return task_instance._try_number
 return task_instance._try_number + 1
 
@@ -1798,18 +1796,18 @@ class TaskInstance(Base, LoggingMixin):
 run_id: str,
 task_id: str,
 map_index: int,
-select_columns: bool = False,
 lock_for_update: bool = False,
 session: Session = NEW_SESSION,
 ) -> TaskInstance | TaskInstancePydantic | None:
 query = (
-session.query(*TaskInstance.__table__.columns) if select_columns 
else session.query(TaskInstance)
-)
-query = query.filter_by(
-dag_id=dag_id,
-run_id=run_id,
-task_id=task_id,
-map_index=map_index,
+session.query(TaskInstance)
+.options(lazyload("dag_run"))  # lazy load dag run to avoid 
locking it
+.filter_by(
+dag_id=dag_id,
+run_id=run_id,
+task_id=task_id,
+map_index=map_index,
+)
 )
 
 if lock_for_update:
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 8dacc839cb..46654d564d 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -4562,3 +4562,16 @@ def test_taskinstance_with_note(create_task_instance, 
session):
 
 assert 
session.query(TaskInstance).filter_by(**filter_kwargs).one_or_none() is None
 assert 
session.query(TaskInstanceNote).filter_by(**filter_kwargs).one_or_none() is None
+
+
+def test__refresh_from_db_should_not_increment_try_number(dag_maker, session):
+with dag_maker():
+ 

(airflow) branch main updated (31e119a0e6 -> eba50465a6)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 31e119a0e6 Add executor field to the DB and parameter to the operators 
(#38474)
 add eba50465a6 Don't create session in get_dag if not reading dags from 
database (#38553)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/cli.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)



(airflow) branch main updated (399854fb7d -> 67cb3023af)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 399854fb7d Update `pytest_collection_modifyitems` to use correct 
properties (#38665)
 add 67cb3023af Don't create session in _execute_callable if using internal 
API (#38566)

No new revisions were added by this update.

Summary of changes:
 airflow/models/taskinstance.py | 12 +++-
 1 file changed, 7 insertions(+), 5 deletions(-)



(airflow) branch main updated: Update render filename to use internal API (#38558)

2024-04-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 279b45625b Update render filename to use internal API (#38558)
279b45625b is described below

commit 279b45625b99d6522ef97611f89c8353d05ca3a6
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Apr 2 08:47:05 2024 -0700

Update render filename to use internal API (#38558)

For AIP-44. Previously we would always create a session every time. I 
refactor into sub function _render_filename_db_access the portion that needs 
the session.
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  2 +
 airflow/utils/log/file_task_handler.py | 58 +++---
 tests/task/task_runner/test_task_runner.py |  4 +-
 3 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index 11bf5f9359..5074504b8d 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -45,6 +45,7 @@ def _initialize_map() -> dict[str, Callable]:
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.secrets.metastore import MetastoreBackend
+from airflow.utils.log.file_task_handler import FileTaskHandler
 
 functions: list[Callable] = [
 DagFileProcessor.update_import_errors,
@@ -55,6 +56,7 @@ def _initialize_map() -> dict[str, Callable]:
 DagModel.get_current,
 DagFileProcessorManager.clear_nonexistent_import_errors,
 DagWarning.purge_inactive_dag_warnings,
+FileTaskHandler._render_filename_db_access,
 Job._add_to_db,
 Job._fetch_from_db,
 Job._kill,
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index a5a2da6062..95d6849b20 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -32,6 +32,7 @@ from urllib.parse import urljoin
 
 import pendulum
 
+from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.executors.executor_loader import ExecutorLoader
@@ -39,11 +40,16 @@ from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, 
render_template_to_string
 from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
-from airflow.utils.session import create_session
+from airflow.utils.session import provide_session
 from airflow.utils.state import State, TaskInstanceState
 
 if TYPE_CHECKING:
+from pendulum import DateTime
+
+from airflow.models import DagRun
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+from airflow.serialization.pydantic.dag_run import DagRunPydantic
+from airflow.serialization.pydantic.taskinstance import 
TaskInstancePydantic
 
 logger = logging.getLogger(__name__)
 
@@ -134,14 +140,14 @@ def _interleave_logs(*logs):
 last = v
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(ti: TaskInstanceKey | TaskInstance | TaskInstancePydantic, 
session) -> TaskInstance:
 """Given TI | TIKey, return a TI object.
 
 Will raise exception if no TI is found in the database.
 """
-from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+from airflow.models.taskinstance import TaskInstance
 
-if not isinstance(ti, TaskInstanceKey):
+if isinstance(ti, TaskInstance):
 return ti
 val = (
 session.query(TaskInstance)
@@ -255,22 +261,33 @@ class FileTaskHandler(logging.Handler):
 if self.handler:
 self.handler.close()
 
-def _render_filename(self, ti: TaskInstance | TaskInstanceKey, try_number: 
int) -> str:
+@staticmethod
+@internal_api_call
+@provide_session
+def _render_filename_db_access(
+*, ti, try_number: int, session=None
+) -> tuple[DagRun | DagRunPydantic, TaskInstance | TaskInstancePydantic, 
str | None, str | None]:
+ti = _ensure_ti(ti, session)
+dag_run = ti.get_dagrun(session=session)
+template = dag_run.get_log_template(session=session).filename
+str_tpl, jinja_tpl = parse_template_string(template)
+filename = None
+if jinja_tpl:
+if getattr(ti, "task", None) is not None:
+context = ti.get_template_context(session=session)
+else:
+context = Context(

(airflow) branch main updated: Fix duplicate "health" line in internal api openapi config (#38661)

2024-04-01 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new a059ba4f74 Fix duplicate "health" line in internal api openapi config 
(#38661)
a059ba4f74 is described below

commit a059ba4f74aaca793e5eca576e84e67d97c8004e
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Apr 1 16:02:42 2024 -0700

Fix duplicate "health" line in internal api openapi config (#38661)
---
 airflow/api_internal/openapi/internal_api_v1.yaml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/airflow/api_internal/openapi/internal_api_v1.yaml 
b/airflow/api_internal/openapi/internal_api_v1.yaml
index 06fa3be30c..3edacfbc23 100644
--- a/airflow/api_internal/openapi/internal_api_v1.yaml
+++ b/airflow/api_internal/openapi/internal_api_v1.yaml
@@ -73,7 +73,6 @@ paths:
   operationId: health
   deprecated: false
   x-openapi-router-controller: 
airflow.api_internal.endpoints.health_endpoint
-  operationId: health
   tags:
   - JSONRPC
   parameters: []



(airflow) branch main updated (bc7b68b83a -> 51b34bb146)

2024-04-01 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from bc7b68b83a Move cleanup-docker to a shell scripts from composite 
action (#38659)
 add 51b34bb146 Ensure orm models loaded when using RPC API (#38568)

No new revisions were added by this update.

Summary of changes:
 airflow/cli/commands/task_command.py | 4 
 1 file changed, 4 insertions(+)



(airflow) branch main updated: Implement render_templates on TaskInstancePydantic (#38559)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new f74d0e0f26 Implement render_templates on TaskInstancePydantic (#38559)
f74d0e0f26 is described below

commit f74d0e0f2622d6b636de2363f7e44d1c12e1ec0b
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Mar 27 18:26:57 2024 -0700

Implement render_templates on TaskInstancePydantic (#38559)

Part of AIP-44
---
 airflow/serialization/pydantic/taskinstance.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index 7213164ebe..16c4486099 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -129,6 +129,9 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
 def _run_execute_callback(self, context, task):
 TaskInstance._run_execute_callback(self=self, context=context, 
task=task)  # type: ignore[arg-type]
 
+def render_templates(self, context: Context | None = None, jinja_env=None):
+return TaskInstance.render_templates(self=self, context=context, 
jinja_env=jinja_env)  # type: ignore[arg-type]
+
 def init_run_context(self, raw: bool = False) -> None:
 """Set the log context."""
 self.raw = raw



(airflow) branch main updated (4e070ef5d9 -> 5de907504b)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 4e070ef5d9 Make DAG.create_dagrun AIP-44-compatible (#38564)
 add 5de907504b Don't dispose pools when using internal api (#38552)

No new revisions were added by this update.

Summary of changes:
 airflow/task/task_runner/standard_task_runner.py | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)



(airflow) branch main updated: Make DAG.create_dagrun AIP-44-compatible (#38564)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 4e070ef5d9 Make DAG.create_dagrun AIP-44-compatible (#38564)
4e070ef5d9 is described below

commit 4e070ef5d9aa1c313ecd0a66ab87bb5f9425c114
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Mar 27 18:00:22 2024 -0700

Make DAG.create_dagrun AIP-44-compatible (#38564)
---
 airflow/models/dag.py | 54 ---
 1 file changed, 43 insertions(+), 11 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 3e6076e062..1694202918 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -307,6 +307,45 @@ def _triggerer_is_healthy():
 return job and job.is_alive()
 
 
+@internal_api_call
+@provide_session
+def _create_orm_dagrun(
+dag,
+dag_id,
+run_id,
+logical_date,
+start_date,
+external_trigger,
+conf,
+state,
+run_type,
+dag_hash,
+creating_job_id,
+data_interval,
+session,
+):
+run = DagRun(
+dag_id=dag_id,
+run_id=run_id,
+execution_date=logical_date,
+start_date=start_date,
+external_trigger=external_trigger,
+conf=conf,
+state=state,
+run_type=run_type,
+dag_hash=dag_hash,
+creating_job_id=creating_job_id,
+data_interval=data_interval,
+)
+session.add(run)
+session.flush()
+run.dag = dag
+# create the associated task instances
+# state is None at the moment of creation
+run.verify_integrity(session=session)
+return run
+
+
 @functools.total_ordering
 class DAG(LoggingMixin):
 """
@@ -3023,10 +3062,11 @@ class DAG(LoggingMixin):
 copied_params.update(conf or {})
 copied_params.validate()
 
-run = DagRun(
+run = _create_orm_dagrun(
+dag=self,
 dag_id=self.dag_id,
 run_id=run_id,
-execution_date=logical_date,
+logical_date=logical_date,
 start_date=start_date,
 external_trigger=external_trigger,
 conf=conf,
@@ -3035,16 +3075,8 @@ class DAG(LoggingMixin):
 dag_hash=dag_hash,
 creating_job_id=creating_job_id,
 data_interval=data_interval,
+session=session,
 )
-session.add(run)
-session.flush()
-
-run.dag = self
-
-# create the associated task instances
-# state is None at the moment of creation
-run.verify_integrity(session=session)
-
 return run
 
 @classmethod



(airflow) branch main updated: Handle optional session in _refresh_from_db (#38572)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 5c7b3e9fa7 Handle optional session in _refresh_from_db (#38572)
5c7b3e9fa7 is described below

commit 5c7b3e9fa7e9a46044f02ef7a31ebc0344cfb816
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Mar 27 17:59:44 2024 -0700

Handle optional session in _refresh_from_db (#38572)

Needed for AIP-44
---
 airflow/models/taskinstance.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a254130e00..ca680d6e24 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -491,7 +491,10 @@ def _execute_task(task_instance: TaskInstance | 
TaskInstancePydantic, context: C
 
 
 def _refresh_from_db(
-*, task_instance: TaskInstance | TaskInstancePydantic, session: Session, 
lock_for_update: bool = False
+*,
+task_instance: TaskInstance | TaskInstancePydantic,
+session: Session | None = None,
+lock_for_update: bool = False,
 ) -> None:
 """
 Refresh the task instance from the database based on the primary key.
@@ -504,7 +507,7 @@ def _refresh_from_db(
 
 :meta private:
 """
-if task_instance in session:
+if session and task_instance in session:
 session.refresh(task_instance, 
TaskInstance.__mapper__.column_attrs.keys())
 
 ti = TaskInstance.get_task_instance(



(airflow) branch main updated (179b963782 -> e197339548)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 179b963782 Don't dispose sqlalchemy engine when using internal api 
(#38562)
 add e197339548 Implement _run_execute_callback on TaskInstancePydantic 
(#38560)

No new revisions were added by this update.

Summary of changes:
 airflow/serialization/pydantic/taskinstance.py | 3 +++
 1 file changed, 3 insertions(+)



(airflow) branch main updated (31c07ec7e6 -> e364259759)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 31c07ec7e6 Don't check migrations when internal API enabled (#38556)
 add e364259759 Use fetch_dagrun directly to avoid session creation (#38557)

No new revisions were added by this update.

Summary of changes:
 airflow/cli/commands/task_command.py | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)



(airflow) branch main updated (07fd17a32e -> 179b963782)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 07fd17a32e Make type annotation less confusing in task_command.py 
(#38561)
 add 179b963782 Don't dispose sqlalchemy engine when using internal api 
(#38562)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/celery/executors/celery_executor_utils.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)



(airflow) branch main updated (e364259759 -> 07fd17a32e)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from e364259759 Use fetch_dagrun directly to avoid session creation (#38557)
 add 07fd17a32e Make type annotation less confusing in task_command.py 
(#38561)

No new revisions were added by this update.

Summary of changes:
 airflow/cli/commands/task_command.py | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)



(airflow) branch main updated (00f35e84f8 -> 31c07ec7e6)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 00f35e84f8 Don't actually check the db when using internal API (#38554)
 add 31c07ec7e6 Don't check migrations when internal API enabled (#38556)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/cli.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)



(airflow) branch main updated (2ad923a001 -> 00f35e84f8)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 2ad923a001 Rerais of AirflowOptionalProviderFeatureException should be 
direct (#38555)
 add 00f35e84f8 Don't actually check the db when using internal API (#38554)

No new revisions were added by this update.

Summary of changes:
 airflow/cli/commands/db_command.py | 3 +++
 1 file changed, 3 insertions(+)



(airflow) branch main updated (2227414489 -> 2ad923a001)

2024-03-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 2227414489 Turn common.io xcom exception into 
OptionalProviderFeatureException (#38543)
 add 2ad923a001 Rerais of AirflowOptionalProviderFeatureException should be 
direct (#38555)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/celery/cli/celery_command.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



(airflow) branch main updated: Ensure __exit__ is called in decorator context managers (#38383)

2024-03-21 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 095c5fe313 Ensure __exit__ is called in decorator context managers 
(#38383)
095c5fe313 is described below

commit 095c5fe3137e2cb6d45e8f3184bae149cb2850d1
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Mar 21 14:06:49 2024 -0700

Ensure __exit__ is called in decorator context managers (#38383)

In #36800 author fixed zombie scheduler issue arising from context manager 
exit not being called, thus sub process not getting terminated.  It was fixed 
by explicitly calling the `close` function on an ExitStack-managed context 
manager.  Simpler / better / cleaner / more standard solution is to "fix" the 
underlying context managers by wrapping the yield in a try / finally.
---
 airflow/cli/commands/celery_command.py |  8 ---
 airflow/cli/commands/scheduler_command.py  | 32 +++---
 airflow/providers/celery/cli/celery_command.py |  8 ---
 tests/cli/commands/test_scheduler_command.py   |  3 ---
 4 files changed, 23 insertions(+), 28 deletions(-)

diff --git a/airflow/cli/commands/celery_command.py 
b/airflow/cli/commands/celery_command.py
index ae8b9c1925..ed6c0dbbd8 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -91,9 +91,11 @@ def _serve_logs(skip_serve_logs: bool = False):
 if skip_serve_logs is False:
 sub_proc = Process(target=serve_logs)
 sub_proc.start()
-yield
-if sub_proc:
-sub_proc.terminate()
+try:
+yield
+finally:
+if sub_proc:
+sub_proc.terminate()
 
 
 @after_setup_logger.connect()
diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 0b5cac8857..2a55ca2373 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 
 import logging
 from argparse import Namespace
-from contextlib import ExitStack, contextmanager
+from contextlib import contextmanager
 from multiprocessing import Process
 
 from airflow import settings
@@ -45,18 +45,8 @@ def _run_scheduler_job(args) -> None:
 
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
 InternalApiConfig.force_database_direct_access()
 enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
-with ExitStack() as stack:
-stack.enter_context(_serve_logs(args.skip_serve_logs))
-stack.enter_context(_serve_health_check(enable_health_check))
-
-try:
-run_job(job=job_runner.job, execute_callable=job_runner._execute)
-except Exception:
-log.exception("Exception when running scheduler job")
-raise
-finally:
-# Ensure that the contexts are closed
-stack.close()
+with _serve_logs(args.skip_serve_logs), 
_serve_health_check(enable_health_check):
+run_job(job=job_runner.job, execute_callable=job_runner._execute)
 
 
 @cli_utils.action_cli
@@ -84,9 +74,11 @@ def _serve_logs(skip_serve_logs: bool = False):
 if skip_serve_logs is False:
 sub_proc = Process(target=serve_logs)
 sub_proc.start()
-yield
-if sub_proc:
-sub_proc.terminate()
+try:
+yield
+finally:
+if sub_proc:
+sub_proc.terminate()
 
 
 @contextmanager
@@ -96,6 +88,8 @@ def _serve_health_check(enable_health_check: bool = False):
 if enable_health_check:
 sub_proc = Process(target=serve_health_check)
 sub_proc.start()
-yield
-if sub_proc:
-sub_proc.terminate()
+try:
+yield
+finally:
+if sub_proc:
+sub_proc.terminate()
diff --git a/airflow/providers/celery/cli/celery_command.py 
b/airflow/providers/celery/cli/celery_command.py
index fff46090aa..f7682b9abf 100644
--- a/airflow/providers/celery/cli/celery_command.py
+++ b/airflow/providers/celery/cli/celery_command.py
@@ -107,9 +107,11 @@ def _serve_logs(skip_serve_logs: bool = False):
 if skip_serve_logs is False:
 sub_proc = Process(target=serve_logs)
 sub_proc.start()
-yield
-if sub_proc:
-sub_proc.terminate()
+try:
+yield
+finally:
+if sub_proc:
+sub_proc.terminate()
 
 
 @after_setup_logger.connect()
diff --git a/tests/cli/commands/test_scheduler_command.py 
b/tests/cli/commands/test_scheduler_command.py
index 2853763563..b6d6a9d921 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -165,10 +165,8 @@ class TestSchedulerCommand:
 @mock.patch("air

(airflow) branch main updated: Implement `set_state` on `TaskInstancePydantic` (#38297)

2024-03-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 5ea5f05b3f Implement `set_state` on `TaskInstancePydantic` (#38297)
5ea5f05b3f is described below

commit 5ea5f05b3ffd12ccf58a7ef5d0f31fca4c2b3fec
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Mar 20 09:24:06 2024 -0700

Implement `set_state` on `TaskInstancePydantic` (#38297)
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  1 +
 airflow/models/taskinstance.py | 41 +++---
 airflow/serialization/pydantic/taskinstance.py |  3 ++
 3 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index 7a8f2edfdb..656fc70002 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -81,6 +81,7 @@ def _initialize_map() -> dict[str, Callable]:
 TaskInstance._check_and_change_state_before_execution,
 TaskInstance.get_task_instance,
 TaskInstance._get_dagrun,
+TaskInstance._set_state,
 TaskInstance.fetch_handle_failure_context,
 TaskInstance.save_to_db,
 TaskInstance._schedule_downstream_tasks,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b16fa633a6..ad956e68b4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -62,7 +62,7 @@ from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.ext.mutable import MutableDict
 from sqlalchemy.orm import reconstructor, relationship
 from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
-from sqlalchemy.sql.expression import case
+from sqlalchemy.sql.expression import case, select
 
 from airflow import settings
 from airflow.api_internal.internal_api_call import internal_api_call
@@ -1845,6 +1845,32 @@ class TaskInstance(Base, LoggingMixin):
 """Returns a tuple that identifies the task instance uniquely."""
 return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, 
self.try_number, self.map_index)
 
+@staticmethod
+@internal_api_call
+def _set_state(ti: TaskInstance | TaskInstancePydantic, state, session: 
Session) -> bool:
+if not isinstance(ti, TaskInstance):
+ti = session.scalars(
+select(TaskInstance).where(
+TaskInstance.task_id == ti.task_id,
+TaskInstance.dag_id == ti.dag_id,
+TaskInstance.run_id == ti.run_id,
+TaskInstance.map_index == ti.map_index,
+)
+).one()
+
+if ti.state == state:
+return False
+
+current_time = timezone.utcnow()
+ti.log.debug("Setting task state for %s to %s", ti, state)
+ti.state = state
+ti.start_date = ti.start_date or current_time
+if ti.state in State.finished or ti.state == 
TaskInstanceState.UP_FOR_RETRY:
+ti.end_date = ti.end_date or current_time
+ti.duration = (ti.end_date - ti.start_date).total_seconds()
+session.merge(ti)
+return True
+
 @provide_session
 def set_state(self, state: str | None, session: Session = NEW_SESSION) -> 
bool:
 """
@@ -1854,18 +1880,7 @@ class TaskInstance(Base, LoggingMixin):
 :param session: SQLAlchemy ORM Session
 :return: Was the state changed
 """
-if self.state == state:
-return False
-
-current_time = timezone.utcnow()
-self.log.debug("Setting task state for %s to %s", self, state)
-self.state = state
-self.start_date = self.start_date or current_time
-if self.state in State.finished or self.state == 
TaskInstanceState.UP_FOR_RETRY:
-self.end_date = self.end_date or current_time
-self.duration = (self.end_date - self.start_date).total_seconds()
-session.merge(self)
-return True
+return self._set_state(ti=self, state=state, session=session)
 
 @property
 def is_premature(self) -> bool:
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index 44db550e5b..7c60c5afc5 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -123,6 +123,9 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
 def clear_xcom_data(self, session: Session | None = None):
 TaskInstance._clear_xcom_data(ti=self, session=session)
 
+def set_state(self, state, session: Session | None = None) 

(airflow) branch main updated: Implement get_dagrun on TaskInstancePydantic (#38295)

2024-03-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new b3f54e046a Implement get_dagrun on TaskInstancePydantic (#38295)
b3f54e046a is described below

commit b3f54e046a7f663a3a2e1754b8e2b1c153227df4
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Mar 20 01:02:31 2024 -0700

Implement get_dagrun on TaskInstancePydantic (#38295)

Co-authored-by: Vincent <97131062+vincb...@users.noreply.github.com>
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  1 +
 airflow/models/taskinstance.py | 13 +
 airflow/serialization/pydantic/taskinstance.py |  6 ++
 3 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index d0bb10117d..7a8f2edfdb 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -80,6 +80,7 @@ def _initialize_map() -> dict[str, Callable]:
 SerializedDagModel.get_serialized_dag,
 TaskInstance._check_and_change_state_before_execution,
 TaskInstance.get_task_instance,
+TaskInstance._get_dagrun,
 TaskInstance.fetch_handle_failure_context,
 TaskInstance.save_to_db,
 TaskInstance._schedule_downstream_tasks,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6f8130c4aa..0d27404292 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2115,6 +2115,14 @@ class TaskInstance(Base, LoggingMixin):
 """Check on whether the task instance is in the right state and 
timeframe to be retried."""
 return self.state == TaskInstanceState.UP_FOR_RETRY and 
self.next_retry_datetime() < timezone.utcnow()
 
+@staticmethod
+@internal_api_call
+def _get_dagrun(dag_id, run_id, session) -> DagRun:
+from airflow.models.dagrun import DagRun  # Avoid circular import
+
+dr = session.query(DagRun).filter(DagRun.dag_id == dag_id, 
DagRun.run_id == run_id).one()
+return dr
+
 @provide_session
 def get_dagrun(self, session: Session = NEW_SESSION) -> DagRun:
 """
@@ -2131,13 +2139,10 @@ class TaskInstance(Base, LoggingMixin):
 self.dag_run.dag = self.task.dag
 return self.dag_run
 
-from airflow.models.dagrun import DagRun  # Avoid circular import
-
-dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, 
DagRun.run_id == self.run_id).one()
+dr = self._get_dagrun(self.dag_id, self.run_id, session)
 if getattr(self, "task", None) is not None:
 if TYPE_CHECKING:
 assert self.task
-
 dr.dag = self.task.dag
 # Record it in the instance for next time. This means that 
`self.execution_date` will work correctly
 set_committed_value(self, "dag_run", dr)
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index 51379fdf6d..44db550e5b 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -176,11 +176,9 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
 
 :param session: SQLAlchemy ORM Session
 
-TODO: make it works for AIP-44
-
-:return: Pydantic serialized version of DaGrun
+:return: Pydantic serialized version of DagRun
 """
-raise NotImplementedError()
+return TaskInstance._get_dagrun(dag_id=self.dag_id, 
run_id=self.run_id, session=session)
 
 def _execute_task(self, context, task_orig):
 """



(airflow) branch main updated (93814d39ed -> 0eb1405617)

2024-03-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 93814d39ed Implement get_log_template on DagRunPydantic (#38303)
 add 0eb1405617 Implement clear_xcom_data on TaskInstancePydantic (#38302)

No new revisions were added by this update.

Summary of changes:
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  1 +
 airflow/models/taskinstance.py | 21 ++---
 airflow/serialization/pydantic/taskinstance.py |  3 +++
 3 files changed, 18 insertions(+), 7 deletions(-)



(airflow) branch main updated (b52e12224f -> 93814d39ed)

2024-03-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from b52e12224f Add upper limit to planned calendar events calculation 
(#38310)
 add 93814d39ed Implement get_log_template on DagRunPydantic (#38303)

No new revisions were added by this update.

Summary of changes:
 airflow/serialization/pydantic/dag_run.py | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)



(airflow) branch main updated (d3ef6736ea -> c6bc052980)

2024-03-19 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from d3ef6736ea Add Matomo as an option for analytics_tool. (#38221)
 add c6bc052980 Add default for `task` on TaskInstance / fix attrs on 
TaskInstancePydantic (#37854)

No new revisions were added by this update.

Summary of changes:
 airflow/example_dags/plugins/event_listener.py |  8 ++-
 airflow/executors/base_executor.py |  3 +
 airflow/executors/debug_executor.py|  3 +
 airflow/jobs/local_task_job_runner.py  |  2 +
 airflow/lineage/__init__.py|  5 +-
 airflow/models/dagrun.py   | 13 +++-
 airflow/models/renderedtifields.py |  4 ++
 airflow/models/skipmixin.py|  3 +-
 airflow/models/taskinstance.py | 77 --
 .../providers/elasticsearch/log/es_task_handler.py |  2 +
 airflow/providers/openlineage/plugins/listener.py  |  8 ++-
 airflow/providers/openlineage/plugins/macros.py|  7 +-
 airflow/serialization/pydantic/taskinstance.py | 15 +++--
 airflow/task/priority_strategy.py  |  6 ++
 airflow/task/task_runner/standard_task_runner.py   |  2 +
 airflow/ti_deps/dep_context.py |  2 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py|  2 +
 airflow/ti_deps/deps/trigger_rule_dep.py   | 25 +++
 airflow/utils/log/file_task_handler.py | 13 ++--
 19 files changed, 172 insertions(+), 28 deletions(-)



(airflow) branch main updated (19ee4fe519 -> 72d19565d8)

2024-03-12 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 19ee4fe519 Fix graph task state border color (#38084)
 add 72d19565d8 Handle re-serialization of pydantic models (#37855)

No new revisions were added by this update.

Summary of changes:
 airflow/serialization/serialized_objects.py| 34 +
 tests/serialization/test_serialized_objects.py | 99 ++
 2 files changed, 109 insertions(+), 24 deletions(-)



(airflow) branch main updated (83316b8158 -> 1726b9372b)

2024-03-04 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 83316b8158 Prepare docs 1st wave (RC1) March 2024 (#37876)
 add 1726b9372b Session is not used in _do_render_template_fields (#37856)

No new revisions were added by this update.

Summary of changes:
 airflow/models/abstractoperator.py | 4 
 airflow/models/mappedoperator.py   | 1 -
 airflow/template/templater.py  | 5 -
 3 files changed, 10 deletions(-)



(airflow) branch main updated: Add conditional logic for dataset triggering (#37016)

2024-02-21 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new f971232ab4 Add conditional logic for dataset triggering (#37016)
f971232ab4 is described below

commit f971232ab4a636a1f54a349041a7e22476b8b2dc
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Feb 21 11:24:21 2024 -0800

Add conditional logic for dataset triggering (#37016)

Add conditional logic for dataset-triggered dags so that we can schedule 
based on dataset1 OR dataset1.

This PR only implements the underlying classes, DatasetAny and DatasetAll. 
In a followup PR we will add more convenient syntax for this, specifically the 
| and & symbols, e.g. (dataset1 | dataset2) & dataset3.

-

Co-authored-by: Ankit Chaurasia <8670962+sunank...@users.noreply.github.com>
Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
Co-authored-by: Wei Lee 
---
 airflow/models/dag.py | 102 +-
 airflow/models/dataset.py |  49 ++-
 airflow/serialization/enums.py|   2 +
 airflow/serialization/schema.json |  36 -
 airflow/serialization/serialized_objects.py   |  29 +++-
 airflow/timetables/datasets.py|  32 +++--
 tests/cli/commands/test_dag_command.py|  12 +-
 tests/datasets/test_dataset.py| 196 ++
 tests/serialization/test_dag_serialization.py |  14 +-
 tests/timetables/test_datasets_timetable.py   |   1 -
 10 files changed, 409 insertions(+), 64 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index dd43568657..237759010a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import asyncio
-import collections
 import copy
 import functools
 import itertools
@@ -31,7 +30,7 @@ import time
 import traceback
 import warnings
 import weakref
-from collections import deque
+from collections import abc, defaultdict, deque
 from contextlib import ExitStack
 from datetime import datetime, timedelta
 from inspect import signature
@@ -99,6 +98,13 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import RUN_ID_REGEX, DagRun
+from airflow.models.dataset import (
+DatasetAll,
+DatasetAny,
+DatasetBooleanCondition,
+DatasetDagRunQueue,
+DatasetModel,
+)
 from airflow.models.param import DagParam, ParamsDict
 from airflow.models.taskinstance import (
 Context,
@@ -462,7 +468,7 @@ class DAG(LoggingMixin):
 on_success_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
 on_failure_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
 doc_md: str | None = None,
-params: collections.abc.MutableMapping | None = None,
+params: abc.MutableMapping | None = None,
 access_control: dict | None = None,
 is_paused_upon_creation: bool | None = None,
 jinja_environment_kwargs: dict | None = None,
@@ -580,25 +586,28 @@ class DAG(LoggingMixin):
 
 self.timetable: Timetable
 self.schedule_interval: ScheduleInterval
-self.dataset_triggers: Collection[Dataset] = []
-
+self.dataset_triggers: DatasetBooleanCondition | None = None
+if isinstance(schedule, (DatasetAll, DatasetAny)):
+self.dataset_triggers = schedule
 if isinstance(schedule, Collection) and not isinstance(schedule, str):
 from airflow.datasets import Dataset
 
 if not all(isinstance(x, Dataset) for x in schedule):
 raise ValueError("All elements in 'schedule' should be 
datasets")
-self.dataset_triggers = list(schedule)
+self.dataset_triggers = DatasetAll(*schedule)
 elif isinstance(schedule, Timetable):
 timetable = schedule
 elif schedule is not NOTSET:
 schedule_interval = schedule
 
-if self.dataset_triggers:
+if isinstance(schedule, DatasetOrTimeSchedule):
+self.timetable = schedule
+self.dataset_triggers = self.timetable.datasets
+self.schedule_interval = self.timetable.summary
+elif self.dataset_triggers:
 self.timetable = DatasetTriggeredTimetable()
 self.schedule_interval = self.timetable.summary
 elif timetable:
-if isinstance(timetable, DatasetOrTimeSchedule):
-self.dataset_triggers = timetable.datasets
 self.timetable = timetable
 self.schedule_interval = self.timetable.summa

(airflow) branch main updated: Simplify query for orphaned tasks (#36566)

2024-01-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 63e93d72c2 Simplify query for orphaned tasks (#36566)
63e93d72c2 is described below

commit 63e93d72c2543f4cd476e01e6415105d2135816e
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Jan 22 10:30:56 2024 -0800

Simplify query for orphaned tasks (#36566)

Two changes here.

First, previously we ended up with two joins to DagRun because the dag_run 
relationship attr is `lazy="joined"` and sqlalchemy was not using it.  By 
setting to be lazy, we eliminate the extra join and we also don't ask for any 
columns in dag run (previously the generated query asked for all of them, even 
though we try to limit via options further down).

Second, we use inner join for queued by job.  The outer was only there to 
handle tasks in flight during upgrade to 2.0.
---
 airflow/jobs/scheduler_job_runner.py | 11 ---
 tests/jobs/test_scheduler_job.py | 27 ---
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 627e0d1468..c0c9474913 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, 
Iterable, Iterator
 
 from sqlalchemy import and_, delete, func, not_, or_, select, text, update
 from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import joinedload, load_only, make_transient, selectinload
+from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, 
selectinload
 from sqlalchemy.sql import expression
 
 from airflow import settings
@@ -1633,13 +1633,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
 query = (
 select(TI)
+.options(lazyload("dag_run"))  # avoids double join to 
dag_run
 .where(TI.state.in_(State.adoptable_states))
-# outerjoin is because we didn't use to have 
queued_by_job
-# set, so we need to pick up anything pre upgrade. 
This (and the
-# "or queued_by_job_id IS NONE") can go as soon as 
scheduler HA is
-# released.
-.outerjoin(TI.queued_by_job)
-.where(or_(TI.queued_by_job_id.is_(None), Job.state != 
JobState.RUNNING))
+.join(TI.queued_by_job)
+.where(Job.state.is_distinct_from(JobState.RUNNING))
 .join(TI.dag_run)
 .where(
 DagRun.run_type != DagRunType.BACKFILL_JOB,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8e745e1e08..08d994d50c 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1622,10 +1622,15 @@ class TestSchedulerJob:
 start_date=DEFAULT_DATE,
 session=session,
 )
+scheduler_job = Job()
+session.add(scheduler_job)
+session.commit()
 ti = dr.get_task_instance(task_id=op1.task_id, session=session)
 ti.state = State.QUEUED
+ti.queued_by_job_id = scheduler_job.id
 ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
 ti2.state = State.QUEUED
+ti2.queued_by_job_id = scheduler_job.id
 session.commit()
 
 processor = mock.MagicMock()
@@ -1636,6 +1641,7 @@ class TestSchedulerJob:
 self.job_runner.adopt_or_reset_orphaned_tasks()
 
 ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+
 assert ti.state == State.NONE
 
 ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
@@ -3153,19 +3159,21 @@ class TestSchedulerJob:
 "adoptable_state",
 list(sorted(State.adoptable_states)),
 )
-def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state):
+def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, 
session):
 dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
 with dag_maker(dag_id=dag_id, schedule="@daily"):
 task_id = dag_id + "_task"
 EmptyOperator(task_id=task_id)
-
+old_job = Job()
+session.add(old_job)
+session.commit()
 scheduler_job = Job()
 self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
-session = settings.Session()
 
 dr1 = dag_maker.create_dagrun(external_trigger=True)
 ti = dr

(airflow) branch main updated: Optimize max_execution_date query in single dag case (#33242)

2024-01-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 10c04a4efc Optimize max_execution_date query in single dag case 
(#33242)
10c04a4efc is described below

commit 10c04a4efcee6b92f8057a5a7dbd21d9ca2de710
Author: Josh Owen 
AuthorDate: Mon Jan 22 11:08:25 2024 -0500

Optimize max_execution_date query in single dag case (#33242)

We can make better use of an index when we're only dealing with one dag, 
which is a common case.

-

Co-authored-by: Elad Kalif <45845474+elad...@users.noreply.github.com>
Co-authored-by: Daniel Standish 
<15932138+dstand...@users.noreply.github.com>
---
 airflow/models/dag.py| 69 +++---
 tests/models/test_dag.py | 86 
 2 files changed, 136 insertions(+), 19 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9ee3409c0d..dac7be010a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -73,7 +73,7 @@ from sqlalchemy import (
 update,
 )
 from sqlalchemy.ext.associationproxy import association_proxy
-from sqlalchemy.orm import backref, joinedload, relationship
+from sqlalchemy.orm import backref, joinedload, load_only, relationship
 from sqlalchemy.sql import Select, expression
 
 import airflow.templates
@@ -3062,27 +3062,13 @@ class DAG(LoggingMixin):
 session.add(orm_dag)
 orm_dags.append(orm_dag)
 
-dag_id_to_last_automated_run: dict[str, DagRun] = {}
+latest_runs: dict[str, DagRun] = {}
 num_active_runs: dict[str, int] = {}
 # Skip these queries entirely if no DAGs can be scheduled to save time.
 if any(dag.timetable.can_be_scheduled for dag in dags):
 # Get the latest automated dag run for each existing dag as a 
single query (avoid n+1 query)
-last_automated_runs_subq = (
-select(DagRun.dag_id, 
func.max(DagRun.execution_date).label("max_execution_date"))
-.where(
-DagRun.dag_id.in_(existing_dags),
-or_(DagRun.run_type == DagRunType.BACKFILL_JOB, 
DagRun.run_type == DagRunType.SCHEDULED),
-)
-.group_by(DagRun.dag_id)
-.subquery()
-)
-last_automated_runs = session.scalars(
-select(DagRun).where(
-DagRun.dag_id == last_automated_runs_subq.c.dag_id,
-DagRun.execution_date == 
last_automated_runs_subq.c.max_execution_date,
-)
-)
-dag_id_to_last_automated_run = {run.dag_id: run for run in 
last_automated_runs}
+query = cls._get_latest_runs_query(existing_dags, session)
+latest_runs = {run.dag_id: run for run in session.scalars(query)}
 
 # Get number of active dagruns for all dags we are processing as a 
single query.
 num_active_runs = 
DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session)
@@ -3116,7 +3102,7 @@ class DAG(LoggingMixin):
 orm_dag.timetable_description = dag.timetable.description
 orm_dag.processor_subdir = processor_subdir
 
-last_automated_run: DagRun | None = 
dag_id_to_last_automated_run.get(dag.dag_id)
+last_automated_run: DagRun | None = latest_runs.get(dag.dag_id)
 if last_automated_run is None:
 last_automated_data_interval = None
 else:
@@ -3253,6 +3239,51 @@ class DAG(LoggingMixin):
 for dag in dags:
 cls.bulk_write_to_db(dag.subdags, 
processor_subdir=processor_subdir, session=session)
 
+@classmethod
+def _get_latest_runs_query(cls, dags, session) -> Query:
+"""
+Query the database to retrieve the last automated run for each dag.
+
+:param dags: dags to query
+:param session: sqlalchemy session object
+"""
+if len(dags) == 1:
+# Index optimized fast path to avoid more complicated & slower 
groupby queryplan
+existing_dag_id = list(dags)[0].dag_id
+last_automated_runs_subq = (
+
select(func.max(DagRun.execution_date).label("max_execution_date"))
+.where(
+DagRun.dag_id == existing_dag_id,
+DagRun.run_type.in_((DagRunType.BACKFILL_JOB, 
DagRunType.SCHEDULED)),
+)
+.subquery()
+)
+query = select(DagRun).where(
+DagRun.dag_id == existing_dag_id, DagRun.execution_date == 
last_automated_runs_subq
+)
+else:
+last_automated_runs_subq = (
+select(DagR

(airflow) branch main updated: Update public interface doc re operators (#36767)

2024-01-19 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 66de4bdc9e Update public interface doc re operators (#36767)
66de4bdc9e is described below

commit 66de4bdc9e983cedfab29ccffeee3c503ba78b52
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Jan 19 09:51:49 2024 -0800

Update public interface doc re operators (#36767)

Base classes such as BaseOperator and BaseSensorOperator are public in the 
traditional sense.  We publish them explicitly for the purpose of being 
extended.

But I think that derivatives of them are published only as end products, in 
that their behavior and signature should be subject to semver, but not their 
structure, which should be considered "internal" and not user-facing.  Users 
can extend these classes but they should do so at their own risk, with the 
knowledge that we might refactor.  Otherwise the "public interface" is just 
unnecessarily big.
---
 airflow/models/baseoperator.py   |  6 ++-
 airflow/operators/__init__.py|  7 ++-
 airflow/sensors/__init__.py  |  6 ++-
 docs/apache-airflow/public-airflow-interface.rst | 66 +---
 docs/conf.py |  2 +
 5 files changed, 30 insertions(+), 57 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 2d0244dbbf..1df771231f 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -15,7 +15,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Base operator for all operators."""
+"""
+Base operator for all operators.
+
+:sphinx-autoapi-skip:
+"""
 from __future__ import annotations
 
 import abc
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index f79046e5ae..7fc63e1c8f 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -16,7 +16,12 @@
 # specific language governing permissions and limitations
 # under the License.
 # fmt: off
-"""Operators."""
+"""
+Operators.
+
+:sphinx-autoapi-skip:
+"""
+
 from __future__ import annotations
 
 from airflow.utils.deprecation_tools import add_deprecated_classes
diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py
index e987f6ef61..3d1627cc03 100644
--- a/airflow/sensors/__init__.py
+++ b/airflow/sensors/__init__.py
@@ -16,7 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 # fmt: off
-"""Sensors."""
+"""
+Sensors.
+
+:sphinx-autoapi-skip:
+"""
 from __future__ import annotations
 
 from airflow.utils.deprecation_tools import add_deprecated_classes
diff --git a/docs/apache-airflow/public-airflow-interface.rst 
b/docs/apache-airflow/public-airflow-interface.rst
index bba6c2795a..c960c9c805 100644
--- a/docs/apache-airflow/public-airflow-interface.rst
+++ b/docs/apache-airflow/public-airflow-interface.rst
@@ -18,9 +18,9 @@
 Public Interface of Airflow
 ...
 
-The Public Interface of Apache Airflow is a set of interfaces that allow 
developers to interact
-with and access certain features of the Apache Airflow system. This includes 
operations such as
-creating and managing DAGs (Directed Acyclic Graphs), managing tasks and their 
dependencies,
+The Public Interface of Apache Airflow is the collection of interfaces and 
behaviors in Apache Airflow
+whose changes are governed by semantic versioning. A user interacts with 
Airflow's public interface
+by creating and managing DAGs, managing tasks and dependencies,
 and extending Airflow capabilities by writing new executors, plugins, 
operators and providers. The
 Public Interface can be useful for building custom tools and integrations with 
other systems,
 and for automating certain aspects of the Airflow workflow.
@@ -28,17 +28,19 @@ and for automating certain aspects of the Airflow workflow.
 Using Airflow Public Interfaces
 ===
 
-Using Airflow Public Interfaces is needed when you want to interact with 
Airflow programmatically:
+The following are some examples of the public interface of Airflow:
 
-* When you are extending Airflow classes such as Operators and Hooks. This can 
be done by DAG authors to add missing functionality in their DAGs or by those 
who write reusable custom operators for other DAG authors.
+* When you are writing your own operators or hooks. This commonly done when no 
hook or operator exists for your use case, or when perhaps when one exis

(airflow) branch main updated: Remove usused index on task instance (#36737)

2024-01-12 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new e20b400317 Remove usused index on task instance (#36737)
e20b400317 is described below

commit e20b400317ae4eb41181c5b0cee466eff768b521
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Jan 12 20:32:14 2024 -0800

Remove usused index on task instance (#36737)

Index is only helpful for a user's custom query -- not for airflow in 
general (see comment 
https://github.com/apache/airflow/pull/30762#issuecomment-1886658295).  Noticed 
that this query had zero scans over a period of months.  I also observed that 
it also takes up as much space as the table itself.  Since it's not generally 
useful, it doesn't belong in airflow OSS.

Reverts #30762
---
 .../0126_2_7_0_add_index_to_task_instance_table.py | 16 +-
 ...e.py => 0133_2_8_1_refactor_dag_run_indexes.py} | 34 +++---
 airflow/models/taskinstance.py |  1 -
 airflow/utils/db.py|  1 +
 docs/apache-airflow/img/airflow_erd.sha256 |  2 +-
 docs/apache-airflow/img/airflow_erd.svg|  4 +--
 docs/apache-airflow/migrations-ref.rst |  4 ++-
 7 files changed, 33 insertions(+), 29 deletions(-)

diff --git 
a/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py 
b/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
index 225776119e..6730611a8d 100644
--- a/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
+++ b/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
@@ -37,14 +37,16 @@ airflow_version = "2.7.0"
 
 def upgrade():
 """Apply Add index to task_instance table"""
-op.create_index(
-"ti_state_incl_start_date",
-"task_instance",
-["dag_id", "task_id", "state"],
-postgresql_include=["start_date"],
-)
+# We don't add this index anymore because it's not useful.
+pass
 
 
 def downgrade():
 """Unapply Add index to task_instance table"""
-op.drop_index("ti_state_incl_start_date", table_name="task_instance")
+# At 2.8.1 we removed this index as it is not used, and changed this 
migration not to add it
+# So we use drop if exists (cus it might not be there)
+import sqlalchemy
+from contextlib import suppress
+
+with suppress(sqlalchemy.exc.DatabaseError):  # mysql does not support 
drop if exists index
+op.drop_index("ti_state_incl_start_date", table_name="task_instance", 
if_exists=True)
diff --git 
a/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py 
b/airflow/migrations/versions/0133_2_8_1_refactor_dag_run_indexes.py
similarity index 59%
copy from 
airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
copy to airflow/migrations/versions/0133_2_8_1_refactor_dag_run_indexes.py
index 225776119e..43a24141ee 100644
--- a/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
+++ b/airflow/migrations/versions/0133_2_8_1_refactor_dag_run_indexes.py
@@ -16,35 +16,35 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Add index to task_instance table
+"""Drop unused TI index
 
-Revision ID: 937cbd173ca1
-Revises: c804e5c76e3e
-Create Date: 2023-05-03 11:31:32.527362
+Revision ID: 88344c1d9134
+Revises: 10b52ebd31f7
+Create Date: 2024-01-11 11:54:48.232030
 
 """
-from __future__ import annotations
 
+import sqlalchemy as sa
 from alembic import op
 
+
 # revision identifiers, used by Alembic.
-revision = "937cbd173ca1"
-down_revision = "c804e5c76e3e"
+revision = "88344c1d9134"
+down_revision = "10b52ebd31f7"
 branch_labels = None
 depends_on = None
-airflow_version = "2.7.0"
+airflow_version = "2.8.1"
 
 
 def upgrade():
-"""Apply Add index to task_instance table"""
-op.create_index(
-"ti_state_incl_start_date",
-"task_instance",
-["dag_id", "task_id", "state"],
-postgresql_include=["start_date"],
-)
+"""Apply refactor dag run indexes"""
+# This index may have been created in 2.7 but we've since removed it from 
migrations
+import sqlalchemy
+from contextlib import suppress
+
+with suppress(sqlalchemy.exc.DatabaseError):  # mysql does not support 
drop if exists index
+op.drop_index("ti_state_incl_start_date&q

(airflow) branch main updated (127c0725b9 -> 949fc5788e)

2023-12-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 127c0725b9 Revert "Remove remaining Airflow 2.5 backcompat code from 
GCS Task Handler (#36443)" (#36453)
 add 949fc5788e Fix get_leaves calculation for teardown in nested group 
(#36456)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/task_group.py| 13 +++-
 tests/utils/test_task_group.py | 47 ++
 2 files changed, 59 insertions(+), 1 deletion(-)



(airflow) branch main updated: Deferrable Operators Docs Edits (#33620)

2023-12-07 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 657223c0fd Deferrable Operators Docs Edits (#33620)
657223c0fd is described below

commit 657223c0fd5460b1aa9fca8d96c2b4a17eb40ef9
Author: Laura Zdanski <25642903+lzdan...@users.noreply.github.com>
AuthorDate: Thu Dec 7 13:05:08 2023 -0500

Deferrable Operators Docs Edits (#33620)

Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
Co-authored-by: Daniel Standish 
<15932138+dstand...@users.noreply.github.com>
---
 .../authoring-and-scheduling/deferring.rst | 114 ++---
 1 file changed, 57 insertions(+), 57 deletions(-)

diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst 
b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index c44ac2c1dd..adacf4b79d 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -18,45 +18,48 @@
 Deferrable Operators & Triggers
 ===
 
-Standard :doc:`Operators ` and :doc:`Sensors 
<../core-concepts/sensors>` take up a full *worker slot* for the entire time 
they are running, even if they are idle; for example, if you only have 100 
worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor 
that's currently running but idle, then you *cannot run anything else* - even 
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for 
Sensors solves some of this, all [...]
+Standard :doc:`Operators ` and :doc:`Sensors 
<../core-concepts/sensors>` take up a full *worker slot* for the entire time 
they are running, even if they are idle. For example, if you only have 100 
worker slots available to run tasks, and you have 100 DAGs waiting on a sensor 
that's currently running but idle, then you *cannot run anything else* - even 
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for 
sensors solves some of this, by  [...]
 
-This is where *Deferrable Operators* come in. A deferrable operator is one 
that is written with the ability to suspend itself and free up the worker when 
it knows it has to wait, and hand off the job of resuming it to something 
called a *Trigger*. As a result, while it is suspended (deferred), it is not 
taking up a worker slot and your cluster will have a lot less resources wasted 
on idle Operators or Sensors. Note that by default deferred tasks will not use 
up pool slots, if you would l [...]
+This is where *Deferrable Operators* can be used. When it has nothing to do 
but wait, an operator can suspend itself and free up the worker for other 
processes by *deferring*. When an operator defers, execution moves to the 
triggerer, where the trigger specified by the operator will run.  The trigger 
can do the polling or waiting required by the operator. Then, when the trigger 
finishes polling or waiting, it sends a signal for the operator to resume its 
execution. During the deferred ph [...]
 
-*Triggers* are small, asynchronous pieces of Python code designed to be run 
all together in a single Python process; because they are asynchronous, they 
are able to all co-exist efficiently. As an overview of how this process works:
+*Triggers* are small, asynchronous pieces of Python code designed to run in a 
single Python process. Because they are asynchronous, they can all co-exist 
efficiently in the *triggerer* Airflow component.
 
-* A task instance (running operator) gets to a point where it has to wait, and 
defers itself with a trigger tied to the event that should resume it. This 
frees up the worker to run something else.
-* The new Trigger instance is registered inside Airflow, and picked up by a 
*triggerer* process
-* The trigger is run until it fires, at which point its source task is 
re-scheduled
-* The scheduler queues the task to resume on a worker node
+An overview of how this process works:
 
-Using deferrable operators as a DAG author is almost transparent; writing 
them, however, takes a bit more work.
+* A task instance (running operator) reaches a point where it has to wait for 
other operations or conditions, and defers itself with a trigger tied to an 
event to resume it. This frees up the worker to run something else.
+* The new trigger instance is registered by Airflow, and picked up by a 
triggerer process.
+* The trigger runs until it fires, at which point its source task is 
re-scheduled by the scheduler.
+* The scheduler queues the task to resume on a worker node.
 
+You can either use pre-written deferrable operators as a DAG author or write 
your own. Writing them, however, requires that they meet certain design 
criteria.
 
 Using Deferrable Operators
 --
 

(airflow) branch main updated: Use ExitStack to manage mutation of secrets_backend_list in dag.test (#34620)

2023-11-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 99b4eb769d Use ExitStack to manage mutation of secrets_backend_list in 
dag.test (#34620)
99b4eb769d is described below

commit 99b4eb769d2a3b6692de9c0d83ba64041abf5789
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Nov 27 11:07:10 2023 -0800

Use ExitStack to manage mutation of secrets_backend_list in dag.test 
(#34620)

Although it requires another indent, it's cleaner, and more importantly it 
makes sure that the mutation is undone after failure.
---
 airflow/models/dag.py | 103 +-
 1 file changed, 52 insertions(+), 51 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 27e8258a6d..5daa7bb805 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -32,6 +32,7 @@ import traceback
 import warnings
 import weakref
 from collections import deque
+from contextlib import ExitStack
 from datetime import datetime, timedelta
 from inspect import signature
 from typing import (
@@ -2797,63 +2798,63 @@ class DAG(LoggingMixin):
 self.log.debug("Adding Streamhandler to taskinstance %s", 
ti.task_id)
 ti.log.addHandler(handler)
 
+exit_stack = ExitStack()
 if conn_file_path or variable_file_path:
 local_secrets = LocalFilesystemBackend(
 variables_file_path=variable_file_path, 
connections_file_path=conn_file_path
 )
 secrets_backend_list.insert(0, local_secrets)
+exit_stack.callback(lambda: secrets_backend_list.pop(0))
+
+with exit_stack:
+execution_date = execution_date or timezone.utcnow()
+self.validate()
+self.log.debug("Clearing existing task instances for execution 
date %s", execution_date)
+self.clear(
+start_date=execution_date,
+end_date=execution_date,
+dag_run_state=False,  # type: ignore
+session=session,
+)
+self.log.debug("Getting dagrun for dag %s", self.dag_id)
+logical_date = timezone.coerce_datetime(execution_date)
+data_interval = 
self.timetable.infer_manual_data_interval(run_after=logical_date)
+dr: DagRun = _get_or_create_dagrun(
+dag=self,
+start_date=execution_date,
+execution_date=execution_date,
+run_id=DagRun.generate_run_id(DagRunType.MANUAL, 
execution_date),
+session=session,
+conf=run_conf,
+data_interval=data_interval,
+)
 
-execution_date = execution_date or timezone.utcnow()
-self.validate()
-self.log.debug("Clearing existing task instances for execution date 
%s", execution_date)
-self.clear(
-start_date=execution_date,
-end_date=execution_date,
-dag_run_state=False,  # type: ignore
-session=session,
-)
-self.log.debug("Getting dagrun for dag %s", self.dag_id)
-logical_date = timezone.coerce_datetime(execution_date)
-data_interval = 
self.timetable.infer_manual_data_interval(run_after=logical_date)
-dr: DagRun = _get_or_create_dagrun(
-dag=self,
-start_date=execution_date,
-execution_date=execution_date,
-run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
-session=session,
-conf=run_conf,
-data_interval=data_interval,
-)
-
-tasks = self.task_dict
-self.log.debug("starting dagrun")
-# Instead of starting a scheduler, we run the minimal loop possible to 
check
-# for task readiness and dependency management. This is notably faster
-# than creating a BackfillJob and allows us to surface logs to the user
-while dr.state == DagRunState.RUNNING:
-session.expire_all()
-schedulable_tis, _ = dr.update_state(session=session)
-for s in schedulable_tis:
-s.state = TaskInstanceState.SCHEDULED
-session.commit()
-# triggerer may mark tasks scheduled so we read from DB
-all_tis = set(dr.get_task_instances(session=session))
-scheduled_tis = {x for x in all_tis if x.state == 
TaskInstanceState.SCHEDULED}
-ids_unrunnable = {x for x in all_tis if x.state not in 
State.finished} - scheduled_tis
-if not scheduled_tis and ids_unrunnable:
-self.log.warning("No tasks to run. unrunnable tasks: %s", 
ids_unrunnable)
-

(airflow) branch main updated: Run triggers inline with dag test (#34642)

2023-11-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 7b37a785d0 Run triggers inline with dag test (#34642)
7b37a785d0 is described below

commit 7b37a785d0b74d1e83c7ce84729febffd6e26821
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Nov 27 06:48:17 2023 -0800

Run triggers inline with dag test (#34642)

No need to have trigger running -- will just run them async.
---
 airflow/models/dag.py  | 68 +---
 airflow/models/taskinstance.py |  3 ++
 tests/cli/commands/test_dag_command.py | 81 --
 tests/models/test_mappedoperator.py|  2 +-
 4 files changed, 81 insertions(+), 73 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 26c83754a8..27e8258a6d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -17,7 +17,8 @@
 # under the License.
 from __future__ import annotations
 
-import collections.abc
+import asyncio
+import collections
 import copy
 import functools
 import itertools
@@ -82,11 +83,11 @@ from airflow.datasets.manager import dataset_manager
 from airflow.exceptions import (
 AirflowDagInconsistent,
 AirflowException,
-AirflowSkipException,
 DuplicateTaskIdFound,
 FailStopDagInvalidTriggerRule,
 ParamValidationError,
 RemovedInAirflow3Warning,
+TaskDeferred,
 TaskNotFound,
 )
 from airflow.jobs.job import run_job
@@ -101,7 +102,6 @@ from airflow.models.taskinstance import (
 Context,
 TaskInstance,
 TaskInstanceKey,
-TaskReturnCode,
 clear_task_instances,
 )
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
@@ -285,12 +285,11 @@ def get_dataset_triggered_next_run_info(
 }
 
 
-class _StopDagTest(Exception):
-"""
-Raise when DAG.test should stop immediately.
+def _triggerer_is_healthy():
+from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 
-:meta private:
-"""
+job = TriggererJobRunner.most_recent_job()
+return job and job.is_alive()
 
 
 @functools.total_ordering
@@ -2844,21 +2843,12 @@ class DAG(LoggingMixin):
 if not scheduled_tis and ids_unrunnable:
 self.log.warning("No tasks to run. unrunnable tasks: %s", 
ids_unrunnable)
 time.sleep(1)
+triggerer_running = _triggerer_is_healthy()
 for ti in scheduled_tis:
 try:
 add_logger_if_needed(ti)
 ti.task = tasks[ti.task_id]
-ret = _run_task(ti, session=session)
-if ret is TaskReturnCode.DEFERRED:
-if not _triggerer_is_healthy():
-raise _StopDagTest(
-"Task has deferred but triggerer component is 
not running. "
-"You can start the triggerer by running 
`airflow triggerer` in a terminal."
-)
-except _StopDagTest:
-# Let this exception bubble out and not be swallowed by the
-# except block below.
-raise
+_run_task(ti=ti, inline_trigger=not triggerer_running, 
session=session)
 except Exception:
 self.log.exception("Task failed; ti=%s", ti)
 if conn_file_path or variable_file_path:
@@ -3992,14 +3982,15 @@ class DagContext:
 return None
 
 
-def _triggerer_is_healthy():
-from airflow.jobs.triggerer_job_runner import TriggererJobRunner
+def _run_trigger(trigger):
+async def _run_trigger_main():
+async for event in trigger.run():
+return event
 
-job = TriggererJobRunner.most_recent_job()
-return job and job.is_alive()
+return asyncio.run(_run_trigger_main())
 
 
-def _run_task(ti: TaskInstance, session) -> TaskReturnCode | None:
+def _run_task(*, ti: TaskInstance, inline_trigger: bool = False, session: 
Session):
 """
 Run a single task instance, and push result to Xcom for downstream tasks.
 
@@ -4009,20 +4000,21 @@ def _run_task(ti: TaskInstance, session) -> 
TaskReturnCode | None:
 Args:
 ti: TaskInstance to run
 """
-ret = None
-log.info("*")
-if ti.map_index > 0:
-log.info("Running task %s index %d", ti.task_id, ti.map_index)
-else:
-log.info("Running task %s", ti.task_id)
-try:
-ret = ti._run_raw_task(session=session)
-session.flush()
-log.info("%s ran successfully!", ti.task_id)
-except

(airflow) branch main updated: Check attr on parent not self re TaskContextLogger set_context (#35780)

2023-11-21 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 2a06e278d2 Check attr on parent not self re TaskContextLogger 
set_context (#35780)
2a06e278d2 is described below

commit 2a06e278d290e36e861bd3c40fdc9318e620aa16
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Nov 21 09:55:16 2023 -0800

Check attr on parent not self re TaskContextLogger set_context (#35780)

To know whether we should supply `identifier` param, need to check parent 
class.
---
 airflow/providers/amazon/aws/log/s3_task_handler.py| 4 +++-
 airflow/providers/elasticsearch/log/es_task_handler.py | 4 +++-
 airflow/providers/google/cloud/log/gcs_task_handler.py | 4 +++-
 airflow/providers/microsoft/azure/log/wasb_task_handler.py | 4 +++-
 4 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py 
b/airflow/providers/amazon/aws/log/s3_task_handler.py
index 761c4ce463..f3664f7c41 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -78,7 +78,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
 )
 
 def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None:
-if getattr(self, "supports_task_context_logging", False):
+# todo: remove-at-min-airflow-version-2.8
+#   after Airflow 2.8 can always pass `identifier`
+if getattr(super(), "supports_task_context_logging", False):
 super().set_context(ti, identifier=identifier)
 else:
 super().set_context(ti)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 1e8c75b7e3..c9d3a180e1 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -443,7 +443,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
 self.handler.setLevel(self.level)
 self.handler.setFormatter(self.formatter)
 else:
-if getattr(self, "supports_task_context_logging", False):
+# todo: remove-at-min-airflow-version-2.8
+#   after Airflow 2.8 can always pass `identifier`
+if getattr(super(), "supports_task_context_logging", False):
 super().set_context(ti, identifier=identifier)
 else:
 super().set_context(ti)
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py 
b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 39d0f072a8..9921bb8753 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -142,7 +142,9 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
 )
 
 def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None:
-if getattr(self, "supports_task_context_logging", False):
+# todo: remove-at-min-airflow-version-2.8
+#   after Airflow 2.8 can always pass `identifier`
+if getattr(super(), "supports_task_context_logging", False):
 super().set_context(ti, identifier=identifier)
 else:
 super().set_context(ti)
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py 
b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index f3a00e8432..c57de1acb1 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -96,7 +96,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
 return None
 
 def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None:
-if getattr(self, "supports_task_context_logging", False):
+# todo: remove-at-min-airflow-version-2.8
+#   after Airflow 2.8 can always pass `identifier`
+if getattr(super(), "supports_task_context_logging", False):
 super().set_context(ti, identifier=identifier)
 else:
 super().set_context(ti)



(airflow) branch main updated: Set mark_end_on_close after set_context (#35761)

2023-11-21 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 7389782fb8 Set mark_end_on_close after set_context (#35761)
7389782fb8 is described below

commit 7389782fb8e441bca4ca1a3f811dd20ad098e0a4
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Nov 21 05:42:18 2023 -0800

Set mark_end_on_close after set_context (#35761)

In ES task handler, set_context applies its own logic for 
mark_end_on_close, so it we must set the attr after for our override to persist.
---
 airflow/utils/log/task_context_logger.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/utils/log/task_context_logger.py 
b/airflow/utils/log/task_context_logger.py
index 0661789f5b..84ed207e3a 100644
--- a/airflow/utils/log/task_context_logger.py
+++ b/airflow/utils/log/task_context_logger.py
@@ -98,9 +98,9 @@ class TaskContextLogger:
 
 task_handler = copy(self.task_handler)
 try:
+task_handler.set_context(ti, identifier=self.component_name)
 if hasattr(task_handler, "mark_end_on_close"):
 task_handler.mark_end_on_close = False
-task_handler.set_context(ti, identifier=self.component_name)
 filename, lineno, func, stackinfo = logger.findCaller()
 record = logging.LogRecord(
 self.component_name, level, filename, lineno, msg, args, None, 
func=func



(airflow) branch main updated: Remove inconsequential code bits in KPO logging (#35416)

2023-11-13 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 486ccba4cf Remove inconsequential code bits in KPO logging (#35416)
486ccba4cf is described below

commit 486ccba4cfc373f2864ad1c88ac5093988e61a73
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Nov 13 10:36:51 2023 -0800

Remove inconsequential code bits in KPO logging (#35416)

We were passing around the `logs` variable (actually PodLogConsumer object) 
for no reason.  And the `termination_timeout` param was not used.  It seems 
that the passing around of the object was first introduced in #34127,  then 
perhaps accidentally removed in #34127.  Someone may want to try to 
re-introduce the original fix.
---
 airflow/providers/cncf/kubernetes/utils/pod_manager.py | 18 +++---
 1 file changed, 3 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 856f5fa19b..b215d7b68a 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -385,13 +385,7 @@ class PodManager(LoggingMixin):
 :meta private:
 """
 
-def consume_logs(
-*,
-since_time: DateTime | None = None,
-follow: bool = True,
-termination_timeout: int = 120,
-logs: PodLogsConsumer | None,
-) -> tuple[DateTime | None, PodLogsConsumer | None]:
+def consume_logs(*, since_time: DateTime | None = None) -> DateTime | 
None:
 """
 Try to follow container logs until container completes.
 
@@ -448,20 +442,14 @@ class PodManager(LoggingMixin):
 "Reading of logs interrupted for container %r; will 
retry.",
 container_name,
 )
-return last_captured_timestamp or since_time, logs
+return last_captured_timestamp or since_time
 
 # note: `read_pod_logs` follows the logs, so we shouldn't necessarily 
*need* to
 # loop as we do here. But in a long-running process we might 
temporarily lose connectivity.
 # So the looping logic is there to let us resume following the logs.
-logs = None
 last_log_time = since_time
 while True:
-last_log_time, logs = consume_logs(
-since_time=last_log_time,
-follow=follow,
-termination_timeout=post_termination_timeout,
-logs=logs,
-)
+last_log_time = consume_logs(since_time=last_log_time)
 if not follow:
 return
 if self.container_is_running(pod, container_name=container_name):



(airflow) branch main updated: Helm chart should set AIRFLOW_HOME from airflowHome (#34839)

2023-11-07 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 71e0e6e76b Helm chart should set AIRFLOW_HOME from airflowHome (#34839)
71e0e6e76b is described below

commit 71e0e6e76bcdc36f5a242c5d6412af6677f99d53
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Nov 7 13:07:54 2023 -0800

Helm chart should set AIRFLOW_HOME from airflowHome (#34839)
---
 chart/templates/_helpers.yaml |  2 ++
 helm_tests/airflow_aux/test_airflow_common.py |  2 ++
 helm_tests/airflow_core/test_env.py   | 42 +++
 3 files changed, 46 insertions(+)

diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml
index c8b41d8c29..d7b3c43079 100644
--- a/chart/templates/_helpers.yaml
+++ b/chart/templates/_helpers.yaml
@@ -60,6 +60,8 @@ If release name contains chart name it will be used as a full 
name.
 name: {{ template "fernet_key_secret" . }}
 key: fernet-key
   {{- end }}
+  - name: AIRFLOW_HOME
+value: {{ .Values.airflowHome }}
   # For Airflow <2.3, backward compatibility; moved to [database] in 2.3
   {{- if .Values.enableBuiltInSecretEnvVars.AIRFLOW__CORE__SQL_ALCHEMY_CONN }}
   - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
diff --git a/helm_tests/airflow_aux/test_airflow_common.py 
b/helm_tests/airflow_aux/test_airflow_common.py
index e25b96e5a1..5257b490d2 100644
--- a/helm_tests/airflow_aux/test_airflow_common.py
+++ b/helm_tests/airflow_aux/test_airflow_common.py
@@ -331,6 +331,7 @@ class TestAirflowCommon:
 )
 expected_vars = [
 "AIRFLOW__CORE__FERNET_KEY",
+"AIRFLOW_HOME",
 "AIRFLOW_CONN_AIRFLOW_DB",
 "AIRFLOW__CELERY__BROKER_URL",
 ]
@@ -355,6 +356,7 @@ class TestAirflowCommon:
 )
 expected_vars = [
 "AIRFLOW__CORE__FERNET_KEY",
+"AIRFLOW_HOME",
 "AIRFLOW__CORE__SQL_ALCHEMY_CONN",
 "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN",
 "AIRFLOW_CONN_AIRFLOW_DB",
diff --git a/helm_tests/airflow_core/test_env.py 
b/helm_tests/airflow_core/test_env.py
new file mode 100644
index 00..5a5ee6580a
--- /dev/null
+++ b/helm_tests/airflow_core/test_env.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import jmespath
+
+from tests.charts.helm_template_generator import render_chart
+
+
+def test_should_add_airflow_home():
+exp_path = "/not/even/a/real/path"
+docs = render_chart(
+values={"airflowHome": exp_path},
+show_only=["templates/webserver/webserver-deployment.yaml"],
+)
+assert {"name": "AIRFLOW_HOME", "value": exp_path} in jmespath.search(
+"spec.template.spec.containers[0].env", docs[0]
+)
+
+
+def test_should_add_airflow_home_notset():
+docs = render_chart(
+values={},
+show_only=["templates/webserver/webserver-deployment.yaml"],
+)
+assert {"name": "AIRFLOW_HOME", "value": "/opt/airflow"} in 
jmespath.search(
+"spec.template.spec.containers[0].env", docs[0]
+)



(airflow) branch main updated: Remove tenancity on KPO logs inner func consume_logs (#35504)

2023-11-07 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d6c79ce340 Remove tenancity on KPO logs inner func consume_logs 
(#35504)
d6c79ce340 is described below

commit d6c79ce340dd4cd088edfa92ed052d643ae3587d
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Nov 7 10:41:25 2023 -0800

Remove tenancity on KPO logs inner func consume_logs (#35504)

There are many overlapping layers and strategies of retrying in this area 
of code.  It appears this particular layer may be unnecessary.  See discussion 
starting at 
https://github.com/apache/airflow/pull/31622#issuecomment-1793124398.
---
 airflow/providers/cncf/kubernetes/utils/pod_manager.py | 5 -
 1 file changed, 5 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 75ff82fc2f..856f5fa19b 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -385,11 +385,6 @@ class PodManager(LoggingMixin):
 :meta private:
 """
 
-@tenacity.retry(
-retry=tenacity.retry_if_exception_type(ApiException),
-stop=tenacity.stop_after_attempt(10),
-wait=tenacity.wait_fixed(1),
-)
 def consume_logs(
 *,
 since_time: DateTime | None = None,



(airflow) branch main updated: Simplify KPO multi container log reconciliation logic (#35450)

2023-11-07 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 68b3b7b468 Simplify KPO multi container log reconciliation logic 
(#35450)
68b3b7b468 is described below

commit 68b3b7b4683c8e06098dfa8820be18f253d55f47
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Nov 7 10:22:24 2023 -0800

Simplify KPO multi container log reconciliation logic (#35450)

Easier to follow this way.
---
 airflow/providers/cncf/kubernetes/operators/pod.py |  2 +-
 .../providers/cncf/kubernetes/utils/pod_manager.py | 69 ++
 .../cncf/kubernetes/operators/test_pod.py  |  4 +-
 .../cncf/kubernetes/utils/test_pod_manager.py  |  2 +-
 4 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 5b58b1bedb..58e54a72d3 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -611,7 +611,7 @@ class KubernetesPodOperator(BaseOperator):
 if self.get_logs:
 self.pod_manager.fetch_requested_container_logs(
 pod=self.pod,
-container_logs=self.container_logs,
+containers=self.container_logs,
 follow_logs=True,
 )
 if not self.get_logs or (
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 2d5abec0e9..75ff82fc2f 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -381,6 +381,8 @@ class PodManager(LoggingMixin):
 Between when the pod starts and logs being available, there might be a 
delay due to CSR not approved
 and signed yet. In such situation, ApiException is thrown. This is why 
we are retrying on this
 specific exception.
+
+:meta private:
 """
 
 @tenacity.retry(
@@ -476,53 +478,68 @@ class PodManager(LoggingMixin):
 else:  # follow requested, but container is done
 break
 
-def fetch_requested_container_logs(
-self, pod: V1Pod, container_logs: Iterable[str] | str | Literal[True], 
follow_logs=False
-) -> None:
-"""
-Follow the logs of containers in the specified pod and publish it to 
airflow logging.
-
-Returns when all the containers exit.
-"""
-all_containers = self.get_container_names(pod)
-if all_containers:
-if isinstance(container_logs, str):
+def _reconcile_requested_log_containers(
+self, requested: Iterable[str] | str | bool, actual: list[str], 
pod_name
+) -> list[str]:
+"""Return actual containers based on requested."""
+containers_to_log = []
+if actual:
+if isinstance(requested, str):
 # fetch logs only for requested container if only one 
container is provided
-if container_logs in all_containers:
-self.fetch_container_logs(pod=pod, 
container_name=container_logs, follow=follow_logs)
+if requested in actual:
+containers_to_log.append(requested)
 else:
 self.log.error(
 "container %s whose logs were requested not found in 
the pod %s",
-container_logs,
-pod.metadata.name,
+requested,
+pod_name,
 )
-elif isinstance(container_logs, bool):
+elif isinstance(requested, bool):
 # if True is provided, get logs for all the containers
-if container_logs is True:
-for container_name in all_containers:
-self.fetch_container_logs(pod=pod, 
container_name=container_name, follow=follow_logs)
+if requested is True:
+containers_to_log.extend(actual)
 else:
 self.log.error(
 "False is not a valid value for container_logs",
 )
 else:
 # if a sequence of containers are provided, iterate for every 
container in the pod
-if isinstance(container_logs, Iterable):
-for container in container_logs:
-if container in all_containers:
-self.fetch_container_logs(pod=pod, 
container_name=container, follow=follow

(airflow) branch main updated: Remove before_log in KPO retry and add traceback when interrupted (#35423)

2023-11-03 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 2023a76037 Remove before_log in KPO retry and add traceback when 
interrupted (#35423)
2023a76037 is described below

commit 2023a76037f07a2003ace1d1a7497019316db7a6
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Nov 3 18:55:31 2023 -0700

Remove before_log in KPO retry and add traceback when interrupted (#35423)

2 small logging "fixes"

1. usually tenacity retry will not be invoked.  No need to add confusing 
log messages when there's not actually a retry.
2. When log read is interrupted and pod still running, something 
exceptional has happened. Let's just include the traceback
rather than forcing user to enable debug logging.
---
 airflow/providers/cncf/kubernetes/utils/pod_manager.py | 16 +++-
 1 file changed, 3 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 8e8fbe132d..aff4970cd9 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -20,7 +20,6 @@ from __future__ import annotations
 import enum
 import itertools
 import json
-import logging
 import math
 import time
 import warnings
@@ -37,7 +36,6 @@ from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
 from pendulum import DateTime
 from pendulum.parsing.exceptions import ParserError
-from tenacity import before_log
 from typing_extensions import Literal
 from urllib3.exceptions import HTTPError as BaseHTTPError
 
@@ -392,7 +390,6 @@ class PodManager(LoggingMixin):
 retry=tenacity.retry_if_exception_type(ApiException),
 stop=tenacity.stop_after_attempt(10),
 wait=tenacity.wait_fixed(1),
-before=before_log(self.log, logging.INFO),
 )
 def consume_logs(
 *,
@@ -452,17 +449,10 @@ class PodManager(LoggingMixin):
 self._progress_callback(line)
 self.log.info("[%s] %s", container_name, message_to_log)
 last_captured_timestamp = message_timestamp
-except BaseHTTPError as e:
-self.log.warning(
-"Reading of logs interrupted for container %r with error 
%r; will retry. "
-"Set log level to DEBUG for traceback.",
+except BaseHTTPError:
+self.log.exception(
+"Reading of logs interrupted for container %r; will 
retry.",
 container_name,
-e,
-)
-self.log.debug(
-"Traceback for interrupted logs read for pod %r",
-pod.metadata.name,
-exc_info=True,
 )
 return last_captured_timestamp or since_time, logs
 



[airflow] branch main updated: Remove warning about max_tis per query > parallelism (#34742)

2023-10-04 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 49bb45ca86 Remove warning about max_tis per query > parallelism 
(#34742)
49bb45ca86 is described below

commit 49bb45ca8662fff9136fc218d9e242bdce1a0afe
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Oct 4 10:29:04 2023 -0700

Remove warning about max_tis per query > parallelism (#34742)

This warning is erroneous because it's not true in all cases that the max 
tis per query setting is disregarded when greater than parallelism.

See discussion at 
https://github.com/apache/airflow/pull/32572#issuecomment-1739787276.
---
 airflow/configuration.py | 21 -
 tests/core/test_configuration.py | 16 
 2 files changed, 37 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 0107cb1021..ab26bbbecf 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -717,7 +717,6 @@ class AirflowConfigParser(ConfigParser):
 def validate(self):
 self._validate_sqlite3_version()
 self._validate_enums()
-self._validate_max_tis_per_query()
 
 for section, replacement in self.deprecated_values.items():
 for name, info in replacement.items():
@@ -739,26 +738,6 @@ class AirflowConfigParser(ConfigParser):
 self._upgrade_postgres_metastore_conn()
 self.is_validated = True
 
-def _validate_max_tis_per_query(self) -> None:
-"""
-Check if config ``scheduler.max_tis_per_query`` is not greater than 
``core.parallelism``.
-
-If not met, a warning message is printed to guide the user to correct 
it.
-
-More info: https://github.com/apache/airflow/pull/32572
-"""
-max_tis_per_query = self.getint("scheduler", "max_tis_per_query")
-parallelism = self.getint("core", "parallelism")
-
-if max_tis_per_query > parallelism:
-warnings.warn(
-f"Config scheduler.max_tis_per_query (value: 
{max_tis_per_query}) "
-f"should NOT be greater than core.parallelism (value: 
{parallelism}). "
-"Will now use core.parallelism as the max task instances per 
query "
-"instead of specified value.",
-UserWarning,
-)
-
 def _upgrade_auth_backends(self):
 """
 Ensure a custom auth_backends setting contains session.
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index 2c6ad3ecfa..8137f1031d 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -760,22 +760,6 @@ notacommand = OK
 )
 assert message == exception
 
-@mock.patch.dict(
-"os.environ",
-{
-"AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "200",
-"AIRFLOW__CORE__PARALLELISM": "100",
-},
-)
-def test_max_tis_per_query_too_high(self):
-test_conf = AirflowConfigParser()
-
-with pytest.warns(UserWarning) as ctx:
-test_conf._validate_max_tis_per_query()
-
-captured_warnings_msg = str(ctx.pop().message)
-assert "max_tis_per_query" in captured_warnings_msg and 
"core.parallelism" in captured_warnings_msg
-
 def test_as_dict_works_without_sensitive_cmds(self):
 conf_materialize_cmds = conf.as_dict(display_sensitive=True, raw=True, 
include_cmds=True)
 conf_maintain_cmds = conf.as_dict(display_sensitive=True, raw=True, 
include_cmds=False)



[airflow] branch main updated: Fix typo re schedule vs schedule_interval (#34743)

2023-10-03 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 08bfa08273 Fix typo re schedule vs schedule_interval (#34743)
08bfa08273 is described below

commit 08bfa08273822ce18e01f70f9929130735022583
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Oct 3 21:25:30 2023 -0700

Fix typo re schedule vs schedule_interval (#34743)

Now, typically, the "schedule interval" expression would be passed via 
param "schedule" since "schedule_interval" was deprecated.  So we should not 
use the underscore here (which is a remnant of the old var name) but just use 
either "interval" or "schedule interval", thus referring either to the concept 
(and not the param name) or the actual local function argument.
---
 airflow/models/dag.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6f018fe8da..c738f55e27 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -225,7 +225,7 @@ def create_timetable(interval: ScheduleIntervalArg, 
timezone: Timezone) -> Timet
 return DeltaDataIntervalTimetable(interval)
 if isinstance(interval, str):
 return CronDataIntervalTimetable(interval, timezone)
-raise ValueError(f"{interval!r} is not a valid schedule_interval.")
+raise ValueError(f"{interval!r} is not a valid interval.")
 
 
 def get_last_dagrun(dag_id, session, include_externally_triggered=False):



[airflow] branch main updated (6618c5f90d -> c3a8828eba)

2023-10-02 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 6618c5f90d Fix broken breeze by fixing package version (#34701)
 add c3a8828eba Mark tasks with `all_skipped` trigger rule as `skipped` if 
any task is in `upstream_failed` state (#34392)

No new revisions were added by this update.

Summary of changes:
 airflow/ti_deps/deps/trigger_rule_dep.py|  2 +-
 tests/ti_deps/deps/test_trigger_rule_dep.py | 24 
 2 files changed, 25 insertions(+), 1 deletion(-)



[airflow] branch main updated: Resolve pydantic deprecation warnings re `update_forward_refs` (#34657)

2023-09-28 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 729e5fb6c6 Resolve pydantic deprecation warnings re 
`update_forward_refs` (#34657)
729e5fb6c6 is described below

commit 729e5fb6c691859559c7b6f03fb53ae82120d103
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Sep 28 09:12:49 2023 -0700

Resolve pydantic deprecation warnings re `update_forward_refs` (#34657)

Resolves warnings such as these:

airflow/serialization/pydantic/dag_run.py:126 PydanticDeprecatedSince20: 
The `update_forward_refs` method is deprecated; use `model_rebuild` instead. 
Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration 
Guide at https://errors.pydantic.dev/2.4/migration/
---
 airflow/serialization/pydantic/dag_run.py  | 2 +-
 airflow/serialization/pydantic/taskinstance.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/serialization/pydantic/dag_run.py 
b/airflow/serialization/pydantic/dag_run.py
index 9940e89d2a..a82574cc2d 100644
--- a/airflow/serialization/pydantic/dag_run.py
+++ b/airflow/serialization/pydantic/dag_run.py
@@ -123,4 +123,4 @@ class DagRunPydantic(BaseModelPydantic):
 )
 
 
-DagRunPydantic.update_forward_refs()
+DagRunPydantic.model_rebuild()
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index 949f2cfccf..b53cc1350a 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -346,4 +346,4 @@ class TaskInstancePydantic(BaseModelPydantic):
 return _get_previous_ti(task_instance=self, state=state, 
session=session)
 
 
-TaskInstancePydantic.update_forward_refs()
+TaskInstancePydantic.model_rebuild()



[airflow] branch main updated: Fail dag test if defer without triggerer (#34619)

2023-09-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new e81bb48779 Fail dag test if defer without triggerer (#34619)
e81bb48779 is described below

commit e81bb487796780705f6df984fbfed04f555943d7
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Sep 27 09:25:46 2023 -0700

Fail dag test if defer without triggerer (#34619)

If user runs dag.test and task defers and no triggerer is running, we 
should fail so user does not sit there waiting forever.

-

Co-authored-by: Tzu-ping Chung 
---
 airflow/models/dag.py  | 41 ++
 tests/cli/commands/test_dag_command.py | 37 ++
 2 files changed, 74 insertions(+), 4 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index da734152c8..fdb69aea96 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -97,7 +97,13 @@ from airflow.models.dagcode import DagCode
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import RUN_ID_REGEX, DagRun
 from airflow.models.param import DagParam, ParamsDict
-from airflow.models.taskinstance import Context, TaskInstance, 
TaskInstanceKey, clear_task_instances
+from airflow.models.taskinstance import (
+Context,
+TaskInstance,
+TaskInstanceKey,
+TaskReturnCode,
+clear_task_instances,
+)
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
 from airflow.security import permissions
 from airflow.stats import Stats
@@ -279,6 +285,14 @@ def get_dataset_triggered_next_run_info(
 }
 
 
+class _StopDagTest(Exception):
+"""
+Raise when DAG.test should stop immediately.
+
+:meta private:
+"""
+
+
 @functools.total_ordering
 class DAG(LoggingMixin):
 """
@@ -2828,7 +2842,17 @@ class DAG(LoggingMixin):
 try:
 add_logger_if_needed(ti)
 ti.task = tasks[ti.task_id]
-_run_task(ti, session=session)
+ret = _run_task(ti, session=session)
+if ret is TaskReturnCode.DEFERRED:
+if not _triggerer_is_healthy():
+raise _StopDagTest(
+"Task has deferred but triggerer component is 
not running. "
+"You can start the triggerer by running 
`airflow triggerer` in a terminal."
+)
+except _StopDagTest:
+# Let this exception bubble out and not be swallowed by the
+# except block below.
+raise
 except Exception:
 self.log.exception("Task failed; ti=%s", ti)
 if conn_file_path or variable_file_path:
@@ -3957,7 +3981,14 @@ class DagContext:
 return None
 
 
-def _run_task(ti: TaskInstance, session):
+def _triggerer_is_healthy():
+from airflow.jobs.triggerer_job_runner import TriggererJobRunner
+
+job = TriggererJobRunner.most_recent_job()
+return job and job.is_alive()
+
+
+def _run_task(ti: TaskInstance, session) -> TaskReturnCode | None:
 """
 Run a single task instance, and push result to Xcom for downstream tasks.
 
@@ -3967,18 +3998,20 @@ def _run_task(ti: TaskInstance, session):
 Args:
 ti: TaskInstance to run
 """
+ret = None
 log.info("*")
 if ti.map_index > 0:
 log.info("Running task %s index %d", ti.task_id, ti.map_index)
 else:
 log.info("Running task %s", ti.task_id)
 try:
-ti._run_raw_task(session=session)
+ret = ti._run_raw_task(session=session)
 session.flush()
 log.info("%s ran successfully!", ti.task_id)
 except AirflowSkipException:
 log.info("Task Skipped, continuing")
 log.info("*")
+return ret
 
 
 def _get_or_create_dagrun(
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 6fb589a3ed..0e4a5490b6 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -33,9 +33,13 @@ from airflow import settings
 from airflow.api_connexion.schemas.dag_schema import DAGSchema
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
+from airflow.decorators import task
 from airflow.exceptions import AirflowException
 from airflow.models import DagBag, DagModel, DagRun
+from airflow.models.baseoperator import Base

[airflow] branch main updated: Clarify what landing time means in doc (#34608)

2023-09-25 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new f99e65b4f3 Clarify what landing time means in doc (#34608)
f99e65b4f3 is described below

commit f99e65b4f3df71baf2fdf738643bbda1263e15a0
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Sep 25 14:52:02 2023 -0700

Clarify what landing time means in doc (#34608)
---
 docs/apache-airflow/ui.rst | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/apache-airflow/ui.rst b/docs/apache-airflow/ui.rst
index 87f20d7a88..7afe26cf3d 100644
--- a/docs/apache-airflow/ui.rst
+++ b/docs/apache-airflow/ui.rst
@@ -166,9 +166,9 @@ DAG over many runs.
 
 Landing Times
 .
-Airflow landing times are calculated from the task's scheduled time to
-the time the task finishes, either with success or another state (see
-:ref:`concepts:task-instances`).
+
+The landing time for a task instance is the delta between the dag run's data 
interval end
+(typically this means, when the dag "should" run) and the task instance 
completion time.
 
 
 



[airflow] branch main updated: Support deferrable operators in dag test (#34585)

2023-09-25 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 2035dc71be Support deferrable operators in dag test (#34585)
2035dc71be is described below

commit 2035dc71be68b6d8dcf4536370c6e21ee0fd9585
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Sep 25 09:27:03 2023 -0700

Support deferrable operators in dag test (#34585)

You need to run triggerer separately, but now at least dag test will resume 
the task after deferral.
---
 airflow/models/dag.py  | 16 ++--
 airflow/models/taskinstance.py |  3 +++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4baf2dbf32..da734152c8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -27,6 +27,7 @@ import os
 import pathlib
 import pickle
 import sys
+import time
 import traceback
 import warnings
 import weakref
@@ -123,7 +124,7 @@ from airflow.utils.sqlalchemy import (
 tuple_in_condition,
 with_row_locks,
 )
-from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType
 
@@ -2811,8 +2812,19 @@ class DAG(LoggingMixin):
 # for task readiness and dependency management. This is notably faster
 # than creating a BackfillJob and allows us to surface logs to the user
 while dr.state == DagRunState.RUNNING:
+session.expire_all()
 schedulable_tis, _ = dr.update_state(session=session)
-for ti in schedulable_tis:
+for s in schedulable_tis:
+s.state = TaskInstanceState.SCHEDULED
+session.commit()
+# triggerer may mark tasks scheduled so we read from DB
+all_tis = set(dr.get_task_instances(session=session))
+scheduled_tis = {x for x in all_tis if x.state == 
TaskInstanceState.SCHEDULED}
+ids_unrunnable = {x for x in all_tis if x.state not in 
State.finished} - scheduled_tis
+if not scheduled_tis and ids_unrunnable:
+self.log.warning("No tasks to run. unrunnable tasks: %s", 
ids_unrunnable)
+time.sleep(1)
+for ti in scheduled_tis:
 try:
 add_logger_if_needed(ti)
 ti.task = tasks[ti.task_id]
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 98da686184..039fe68f98 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1361,6 +1361,9 @@ class TaskInstance(Base, LoggingMixin):
 # can be changed when calling 'run'
 self.test_mode = False
 
+def __hash__(self):
+return hash((self.task_id, self.dag_id, self.run_id, self.map_index))
+
 @property
 def stats_tags(self) -> dict[str, str]:
 """Returns task instance tags."""



[airflow] branch main updated: Simplify trigger name expression (#34356)

2023-09-13 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new dd39f84653 Simplify trigger name expression (#34356)
dd39f84653 is described below

commit dd39f846537e1a3558a596866424ca4304cb2438
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Sep 13 22:22:55 2023 -0700

Simplify trigger name expression (#34356)

Just more readable.
---
 airflow/jobs/triggerer_job_runner.py | 10 +++---
 1 file changed, 3 insertions(+), 7 deletions(-)

diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 9262a53f30..34a271c3ac 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -492,15 +492,11 @@ class TriggerRunner(threading.Thread, LoggingMixin):
 while self.to_create:
 trigger_id, trigger_instance = self.to_create.popleft()
 if trigger_id not in self.triggers:
-task_instance: TaskInstance = trigger_instance.task_instance
-dag_id = task_instance.dag_id
-run_id = task_instance.run_id
-task_id = task_instance.task_id
-map_index = task_instance.map_index
-try_number = task_instance.try_number
+ti: TaskInstance = trigger_instance.task_instance
 self.triggers[trigger_id] = {
 "task": asyncio.create_task(self.run_trigger(trigger_id, 
trigger_instance)),
-"name": 
f"{dag_id}/{run_id}/{task_id}/{map_index}/{try_number} (ID {trigger_id})",
+"name": 
f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} "
+f"(ID {trigger_id})",
 "events": 0,
 }
 else:



[airflow] branch main updated (b37eaae157 -> 3f3edee3f5)

2023-08-29 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from b37eaae157 Cleanup: remove remnants of 'shutdown' task instance state 
(#33893)
 add 3f3edee3f5 Fix files expression for rev heads pre-commit (#33899)

No new revisions were added by this update.

Summary of changes:
 .pre-commit-config.yaml   | 6 +-
 scripts/ci/pre_commit/pre_commit_version_heads_map.py | 1 -
 2 files changed, 5 insertions(+), 2 deletions(-)



[airflow] branch main updated (eb07618d29 -> 2d2a1d699b)

2023-08-09 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from eb07618d29 Fix missing related PRs when cherry-picking (#33261)
 add 2d2a1d699b Remove add_task from TaskGroup (#33262)

No new revisions were added by this update.

Summary of changes:
 airflow/models/abstractoperator.py |  8 
 airflow/models/taskmixin.py|  5 -
 airflow/models/xcom_arg.py | 10 --
 airflow/utils/edgemodifier.py  |  9 -
 airflow/utils/task_group.py| 20 
 5 files changed, 52 deletions(-)



[airflow] branch main updated: Add graph screenshots in setup-teardown howto doc (#33244)

2023-08-08 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d45781fca2 Add graph screenshots in setup-teardown howto doc (#33244)
d45781fca2 is described below

commit d45781fca224074c9e8c298816b7ff4411f9c846
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Aug 8 22:46:10 2023 -0700

Add graph screenshots in setup-teardown howto doc (#33244)
---
 docs/apache-airflow/howto/setup-and-teardown.rst   |  71 -
 docs/apache-airflow/img/setup-teardown-complex.png | Bin 0 -> 114476 bytes
 docs/apache-airflow/img/setup-teardown-group.png   | Bin 0 -> 48789 bytes
 docs/apache-airflow/img/setup-teardown-nesting.png | Bin 0 -> 83934 bytes
 .../apache-airflow/img/setup-teardown-parallel.png | Bin 0 -> 43571 bytes
 docs/apache-airflow/img/setup-teardown-scope.png   | Bin 0 -> 62951 bytes
 .../img/setup-teardown-setup-group.png | Bin 0 -> 76759 bytes
 docs/apache-airflow/img/setup-teardown-simple.png  | Bin 0 -> 29606 bytes
 8 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/docs/apache-airflow/howto/setup-and-teardown.rst 
b/docs/apache-airflow/howto/setup-and-teardown.rst
index 08eb4157ef..355442a299 100644
--- a/docs/apache-airflow/howto/setup-and-teardown.rst
+++ b/docs/apache-airflow/howto/setup-and-teardown.rst
@@ -52,6 +52,10 @@ For convenience we can do this in one line by passing 
``create_cluster`` to the
 
   create_cluster >> run_query >> 
delete_cluster.as_teardown(setups=create_cluster)
 
+Here's the graph for this dag:
+
+.. image:: ../img/setup-teardown-simple.png
+
 Observations:
 
   * If you clear ``run_query`` to run it again, then both ``create_cluster`` 
and ``delete_cluster`` will be cleared.
@@ -62,12 +66,16 @@ Additionally, if we have multiple tasks to wrap, we can use 
the teardown as a co
 
 .. code-block:: python
 
-  with delete_cluster.as_teardown(setups=create_cluster):
+  with delete_cluster().as_teardown(setups=create_cluster()):
   [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
   WorkOne() >> [do_this_stuff(), do_other_stuff()]
 
 This will set create_cluster to run before the tasks in the context, and 
delete_cluster after them.
 
+Here it is, shown in the graph:
+
+.. image:: ../img/setup-teardown-complex.png
+
 Note that if you are attempting to add an already-instantiated task to a setup 
context you need to do it explicitly:
 
 .. code-block:: python
@@ -87,6 +95,10 @@ Let's look at an example:
 s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
 w2 >> w4
 
+And the graph:
+
+.. image:: ../img/setup-teardown-scope.png
+
 In the above example, ``w1`` and ``w2`` are "between" ``s1`` and ``t1`` and 
therefore are assumed to require ``s1``. Thus if ``w1`` or ``w2`` is cleared, 
so too will be ``s1`` and ``t1``.  But if ``w3`` or ``w4`` is cleared, neither 
``s1`` nor ``t1`` will be cleared.
 
 You can have multiple setup tasks wired to a single teardown.  The teardown 
will run if at least one of the setups completed successfully.
@@ -97,7 +109,7 @@ You can have a setup without a teardown:
 
 create_cluster >> run_query >> other_task
 
-In this case, everything downstream of create_cluster is assumed to require 
it.  So if you clear query_two, it will also clear create_cluster.  Suppose 
that we add a teardown for create_cluster after run_query:
+In this case, everything downstream of create_cluster is assumed to require 
it.  So if you clear other_task, it will also clear create_cluster.  Suppose 
that we add a teardown for create_cluster after run_query:
 
 .. code-block:: python
 
@@ -119,7 +131,7 @@ Controlling dag run state
 
 Another feature of setup / teardown tasks is you can choose whether or not the 
teardown task should have an impact on dag run state.  Perhaps you don't care 
if the "cleanup" work performed by your teardown task fails, and you only 
consider the dag run a failure if the "work" tasks fail.  By default, teardown 
tasks are not considered for dag run state.
 
-Continuing with the example above, if you want the run's success to depend on 
``delete_cluster``, then set``on_failure_fail_dagrun=True`` when setting 
``delete_cluster`` as teardown. For example:
+Continuing with the example above, if you want the run's success to depend on 
``delete_cluster``, then set ``on_failure_fail_dagrun=True`` when setting 
``delete_cluster`` as teardown. For example:
 
 .. code-block:: python
 
@@ -128,20 +140,24 @@ Continuing with the example above, if you want the run's 
success to depend on ``
 Authoring with task groups
 """"""""""""""""""""""&qu

[airflow] branch main updated: Don't ignore setups when arrowing from group (#33097)

2023-08-08 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new cd7e7bcb23 Don't ignore setups when arrowing from group (#33097)
cd7e7bcb23 is described below

commit cd7e7bcb2310dea19f7ee946716a7c91ed610c68
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Aug 8 12:02:47 2023 -0700

Don't ignore setups when arrowing from group (#33097)

This enables us to have a group with just setups in it.
---
 airflow/utils/task_group.py|  14 +++--
 tests/utils/test_task_group.py | 120 -
 2 files changed, 128 insertions(+), 6 deletions(-)

diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 3c3a01bc7d..b6c40a14a9 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -370,21 +370,25 @@ class TaskGroup(DAGNode):
 tasks = list(self)
 ids = {x.task_id for x in tasks}
 
-def recurse_for_first_non_setup_teardown(task):
+def recurse_for_first_non_teardown(task):
 for upstream_task in task.upstream_list:
 if upstream_task.task_id not in ids:
+# upstream task is not in task group
+continue
+elif upstream_task.is_teardown:
+yield from recurse_for_first_non_teardown(upstream_task)
+elif task.is_teardown and upstream_task.is_setup:
+# don't go through the teardown-to-setup path
 continue
-if upstream_task.is_setup or upstream_task.is_teardown:
-yield from 
recurse_for_first_non_setup_teardown(upstream_task)
 else:
 yield upstream_task
 
 for task in tasks:
 if task.downstream_task_ids.isdisjoint(ids):
-if not (task.is_teardown or task.is_setup):
+if not task.is_teardown:
 yield task
 else:
-yield from recurse_for_first_non_setup_teardown(task)
+yield from recurse_for_first_non_teardown(task)
 
 def child_id(self, label):
 """Prefix label with group_id if prefix_group_id is True. Otherwise 
return the label as-is."""
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index c021d98b88..a9f61debc6 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -22,7 +22,13 @@ from datetime import timedelta
 import pendulum
 import pytest
 
-from airflow.decorators import dag, task as task_decorator, task_group as 
task_group_decorator
+from airflow.decorators import (
+dag,
+setup,
+task as task_decorator,
+task_group as task_group_decorator,
+teardown,
+)
 from airflow.exceptions import TaskAlreadyInTaskGroup
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
@@ -1479,3 +1485,115 @@ def test_task_group_arrow_with_setups_teardowns():
 tg1 >> w2
 assert t1.downstream_task_ids == set()
 assert w1.downstream_task_ids == {"tg1.t1", "w2"}
+
+
+def test_task_group_arrow_with_setup_group():
+with DAG(dag_id="setup_group_teardown_group", start_date=pendulum.now()):
+with TaskGroup("group_1") as g1:
+
+@setup
+def setup_1():
+...
+
+@setup
+def setup_2():
+...
+
+s1 = setup_1()
+s2 = setup_2()
+
+with TaskGroup("group_2") as g2:
+
+@teardown
+def teardown_1():
+...
+
+@teardown
+def teardown_2():
+...
+
+t1 = teardown_1()
+t2 = teardown_2()
+
+@task_decorator
+def work():
+...
+
+w1 = work()
+g1 >> w1 >> g2
+t1.as_teardown(setups=s1)
+t2.as_teardown(setups=s2)
+assert set(s1.operator.downstream_task_ids) == {"work", 
"group_2.teardown_1"}
+assert set(s2.operator.downstream_task_ids) == {"work", 
"group_2.teardown_2"}
+assert set(w1.operator.downstream_task_ids) == {"group_2.teardown_1", 
"group_2.teardown_2"}
+assert set(t1.operator.downstream_task_ids) == set()
+assert set(t2.operator.downstream_task_ids) == set()
+
+def get_nodes(group):
+d = task_group_to_dict(group)
+new_d = {}
+new_d["id"] = d["id"]
+new_d["children"] = [{"id": x["id"]} for x in d["children"]]
+return new_d
+
+assert ge

[airflow] branch main updated: Use arrow with no circle for setup / teardown (#33098)

2023-08-04 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new b57fc45861 Use arrow with no circle for setup / teardown (#33098)
b57fc45861 is described below

commit b57fc45861a7527541e684c00333d983290a88c0
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Aug 4 01:12:26 2023 -0700

Use arrow with no circle for setup / teardown (#33098)

Just seems easier to see.
---
 airflow/www/static/js/dag/details/graph/Node.tsx | 9 +++--
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/airflow/www/static/js/dag/details/graph/Node.tsx 
b/airflow/www/static/js/dag/details/graph/Node.tsx
index 25827f2595..f2d6f92c09 100644
--- a/airflow/www/static/js/dag/details/graph/Node.tsx
+++ b/airflow/www/static/js/dag/details/graph/Node.tsx
@@ -28,10 +28,7 @@ import { getGroupAndMapSummary, hoverDelay } from 
"src/utils";
 import Tooltip from "src/components/Tooltip";
 import InstanceTooltip from "src/dag/InstanceTooltip";
 import { useContainerRef } from "src/context/containerRef";
-import {
-  MdOutlineArrowCircleUp,
-  MdOutlineArrowCircleDown,
-} from "react-icons/md";
+import { ImArrowUpRight2, ImArrowDownRight2 } from "react-icons/im";
 
 export interface CustomNodeProps {
   label: string;
@@ -145,10 +142,10 @@ export const BaseNode = ({
 {taskName}
   
   {setupTeardownType === "setup" && (
-
+
   )}
   {setupTeardownType === "teardown" && (
-
+
   )}
 
 {!!instance && instance.state && (



[airflow] branch main updated: Fail stop feature can work with setup / teardown (#32985)

2023-08-01 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 5e78a09495 Fail stop feature can work with setup / teardown (#32985)
5e78a09495 is described below

commit 5e78a0949523f4489c78e0d956459913376bad0e
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Aug 1 13:52:32 2023 -0700

Fail stop feature can work with setup / teardown (#32985)
---
 airflow/exceptions.py   |  19 --
 airflow/models/baseoperator.py  |  11 ++-
 airflow/models/dag.py   |   6 +-
 airflow/models/taskinstance.py  |  30 ++---
 tests/models/test_baseoperator.py   |  74 +++--
 tests/models/test_dag.py|  12 ++--
 tests/models/test_mappedoperator.py | 129 +++-
 7 files changed, 250 insertions(+), 31 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index ea162fe8db..fe0c4e416b 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -25,6 +25,8 @@ import warnings
 from http import HTTPStatus
 from typing import TYPE_CHECKING, Any, NamedTuple, Sized
 
+from airflow.utils.trigger_rule import TriggerRule
+
 if TYPE_CHECKING:
 from airflow.models import DAG, DagRun
 
@@ -214,20 +216,23 @@ class DagFileExists(AirflowBadRequest):
 warnings.warn("DagFileExists is deprecated and will be removed.", 
DeprecationWarning, stacklevel=2)
 
 
-class DagInvalidTriggerRule(AirflowException):
+class FailStopDagInvalidTriggerRule(AirflowException):
 """Raise when a dag has 'fail_stop' enabled yet has a non-default trigger 
rule."""
 
+_allowed_rules = (TriggerRule.ALL_SUCCESS, 
TriggerRule.ALL_DONE_SETUP_SUCCESS)
+
 @classmethod
-def check(cls, dag: DAG | None, trigger_rule: str):
-from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE
+def check(cls, *, dag: DAG | None, trigger_rule: TriggerRule):
+"""
+Check that fail_stop dag tasks have allowable trigger rules.
 
-if dag is not None and dag.fail_stop and trigger_rule != 
DEFAULT_TRIGGER_RULE:
+:meta private:
+"""
+if dag is not None and dag.fail_stop and trigger_rule not in 
cls._allowed_rules:
 raise cls()
 
 def __str__(self) -> str:
-from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE
-
-return f"A 'fail-stop' dag can only have {DEFAULT_TRIGGER_RULE} 
trigger rule"
+return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} 
trigger rule"
 
 
 class DuplicateTaskIdFound(AirflowException):
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index d49babbeb0..99911f322e 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -54,7 +54,12 @@ from sqlalchemy.orm import Session
 from sqlalchemy.orm.exc import NoResultFound
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, DagInvalidTriggerRule, 
RemovedInAirflow3Warning, TaskDeferred
+from airflow.exceptions import (
+AirflowException,
+FailStopDagInvalidTriggerRule,
+RemovedInAirflow3Warning,
+TaskDeferred,
+)
 from airflow.lineage import apply_lineage, prepare_lineage
 from airflow.models.abstractoperator import (
 DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
@@ -801,8 +806,6 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 dag = dag or DagContext.get_current_dag()
 task_group = task_group or TaskGroupContext.get_current_task_group(dag)
 
-DagInvalidTriggerRule.check(dag, trigger_rule)
-
 self.task_id = task_group.child_id(task_id) if task_group else task_id
 if not self.__from_mapped and task_group:
 task_group.add(self)
@@ -868,6 +871,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 )
 
 self.trigger_rule: TriggerRule = TriggerRule(trigger_rule)
+FailStopDagInvalidTriggerRule.check(dag=dag, 
trigger_rule=self.trigger_rule)
+
 self.depends_on_past: bool = depends_on_past
 self.ignore_first_depends_on_past: bool = ignore_first_depends_on_past
 self.wait_for_past_depends_before_skipping: bool = 
wait_for_past_depends_before_skipping
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 5d2172c56d..8a5f6c714b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -85,8 +85,8 @@ from airflow.exceptions import (
 AirflowDagInconsistent,
 AirflowException,
 AirflowSkipException,
-DagInvalidTriggerRule,
 DuplicateTaskIdFound,
+FailStopDagInvalidTriggerRule,
 RemovedInAirflow3Warning,
 TaskNotFound,
 )
@@ -722,6 +722,7 @@ class DAG(LoggingMix

[airflow] branch main updated: Revert "Add links to DAGRun / DAG / Task in templates-ref.rst (#32245)" (#33010)

2023-08-01 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 9fb26db0e7 Revert "Add links to DAGRun / DAG / Task in 
templates-ref.rst (#32245)" (#33010)
9fb26db0e7 is described below

commit 9fb26db0e766d22cd67d0ed1124dd97a91b519bd
Author: fritz-astronomer <80706212+fritz-astrono...@users.noreply.github.com>
AuthorDate: Tue Aug 1 14:31:08 2023 -0400

Revert "Add links to DAGRun / DAG / Task in templates-ref.rst (#32245)" 
(#33010)

This reverts commit b6477d0c3d498a8a1962fa67e0a92326ee6393fb.
---
 docs/apache-airflow/templates-ref.rst | 24 
 1 file changed, 8 insertions(+), 16 deletions(-)

diff --git a/docs/apache-airflow/templates-ref.rst 
b/docs/apache-airflow/templates-ref.rst
index 7f59091405..34ba7d2ece 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -47,16 +47,16 @@ VariableType
  Description
   | Example: 
``20180101T00+``.
 ``{{ ts_nodash }}`` str   | Same as 
``{{ dag_run.logical_date | ts_nodash }}``.
   | Example: 
``20180101T00``.
-``{{ prev_data_interval_start_success }}``  `pendulum.DateTime`_  | Start of 
the data interval of the prior successful `DAG Run`_.
+``{{ prev_data_interval_start_success }}``  `pendulum.DateTime`_  | Start of 
the data interval of the prior successful DAG run.
 | ``None``| Added in 
version 2.2.
-``{{ prev_data_interval_end_success }}```pendulum.DateTime`_  | End of the 
data interval of the prior successful `DAG Run`_.
+``{{ prev_data_interval_end_success }}```pendulum.DateTime`_  | End of the 
data interval of the prior successful DAG run.
 | ``None``| Added in 
version 2.2.
-``{{ prev_start_date_success }}``   `pendulum.DateTime`_  Start date 
from prior successful `DAG Run`_ (if available).
+``{{ prev_start_date_success }}``   `pendulum.DateTime`_  Start date 
from prior successful DAG run (if available).
 | ``None``
-``{{ dag }}``   DAG   The 
currently running `DAG`_. You can read more about DAGs in :doc:`DAGs 
`.
-``{{ task }}``  BaseOperator  | The 
currently running `Task`_. You can read more about Tasks in 
:doc:`core-concepts/operators`
+``{{ dag }}``   DAG   The 
currently running DAG.
+``{{ task }}``  BaseOperator  | The 
currently running task.
 ``{{ macros }}``  | A 
reference to the macros package. See Macros_ below.
-``{{ task_instance }}`` TaskInstance  The 
currently running `Task Instance`_.
+``{{ task_instance }}`` TaskInstance  The 
currently running task instance.
 ``{{ ti }}``TaskInstance  Same as ``{{ 
task_instance }}``.
 ``{{ params }}``dict[str, Any]| The 
user-defined params. This can be overridden by the mapping
   | passed to 
``trigger_dag -c`` if ``dag_run_conf_overrides_params``
@@ -68,8 +68,8 @@ VariableType  
Description
   | 
``{dag_id}__{task_id}__{ds_nodash}``.
 ``{{ conf }}``  AirflowConfigParser   | The full 
configuration object representing the content of your
   | 
``airflow.cfg``. See :mod:`airflow.configuration.conf`.
-``{{ run_id }}``str   The 
currently running `DAG Run`_'s run ID.
-``{{ dag_run }}``   DagRunThe 
currently running `DAG Run`_.
+``{{ run_id }}``str   The 
currently running DAG run's run ID.
+``{{ dag_run }}``   DagRunThe 
currently running DAG run.
 ``{{ test_mode }}`` bool  Whether the 
task instance was run by the ``airflow test`` CLI.
 ``{{ expanded_ti_count }}`` int | ``None``| Number of 
task instances that a mapped task was expanded into. If
   | the 
current task is not mapped, this should be ``None``.

[airflow] branch main updated: Add initial docs for setup / teardown (#32169)

2023-07-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new d70fecfaf6 Add initial docs for setup / teardown (#32169)
d70fecfaf6 is described below

commit d70fecfaf68dbc34c818290ad35f48bf01044dd9
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Jul 20 13:12:18 2023 -0700

Add initial docs for setup / teardown (#32169)

Co-authored-by: Akash Sharma <35839624+adave...@users.noreply.github.com>
Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
Co-authored-by: Tzu-ping Chung 
---
 .../example_setup_teardown_taskflow.py |  85 +-
 docs/apache-airflow/core-concepts/dags.rst |  18 +-
 docs/apache-airflow/howto/index.rst|   1 +
 docs/apache-airflow/howto/setup-and-teardown.rst   | 184 +
 docs/spelling_wordlist.txt |   3 +
 5 files changed, 247 insertions(+), 44 deletions(-)

diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py 
b/airflow/example_dags/example_setup_teardown_taskflow.py
index 128534f1d2..4dcdbf253f 100644
--- a/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -31,68 +31,75 @@ with DAG(
 ) as dag:
 
 @task
-def task_1():
+def my_first_task():
 print("Hello 1")
 
 @task
-def task_2():
+def my_second_task():
 print("Hello 2")
 
 @task
-def task_3():
+def my_third_task():
 print("Hello 3")
 
 # you can set setup / teardown relationships with the `as_teardown` method.
-t1 = task_1()
-t2 = task_2()
-t3 = task_3()
-t1 >> t2 >> t3.as_teardown(setups=t1)
+task_1 = my_first_task()
+task_2 = my_second_task()
+task_3 = my_third_task()
+task_1 >> task_2 >> task_3.as_teardown(setups=task_1)
 
-# the method `as_teadrown` will mark t3 as teardown, t1 as setup, and 
arrow t1 >> t3
-# now if you clear t2 (downstream), then t1 will be cleared in addition to 
t3
+# The method `as_teardown` will mark task_3 as teardown, task_1 as setup, 
and
+# arrow task_1 >> task_3.
+# Now if you clear task_2, then it's setup task, task_1, will be cleared in
+# addition to its teardown task, task_3
 
 # it's also possible to use a decorator to mark a task as setup or
 # teardown when you define it. see below.
 
 @setup
-def dag_setup():
-print("I am dag_setup")
+def outer_setup():
+print("I am outer_setup")
+return "some cluster id"
 
 @teardown
-def dag_teardown():
-print("I am dag_teardown")
+def outer_teardown(cluster_id):
+print("I am outer_teardown")
+print(f"Tearing down cluster: {cluster_id}")
 
 @task
-def dag_normal_task():
+def outer_work():
 print("I am just a normal task")
 
-s = dag_setup()
-t = dag_teardown()
-
-# by using the decorators, dag_setup and dag_teardown are already marked 
as setup / teardown
-# now we just need to make sure they are linked directly
-# what we need to do is this::
-# s >> t
-# s >> dag_normal_task() >> t
-# but we can use a context manager to make it cleaner
-with s >> t:
-dag_normal_task()
-
 @task_group
 def section_1():
-@task
-def my_setup():
+@setup
+def inner_setup():
 print("I set up")
+return "some_cluster_id"
 
 @task
-def my_teardown():
-print("I tear down")
-
-@task
-def hello():
-print("I say hello")
-
-(s := my_setup()) >> hello() >> my_teardown().as_teardown(setups=s)
-
-# and let's put section 1 inside the "dag setup" and "dag teardown"
-s >> section_1() >> t
+def inner_work(cluster_id):
+print(f"doing some work with {cluster_id=}")
+
+@teardown
+def inner_teardown(cluster_id):
+print(f"tearing down {cluster_id=}")
+
+# this passes the return value of `inner_setup` to both `inner_work` 
and `inner_teardown`
+inner_setup_task = inner_setup()
+inner_work(inner_setup_task) >> inner_teardown(inner_setup_task)
+
+# by using the decorators, outer_setup and outer_teardown are already 
marked as setup / teardown
+# now we just need to make sure they are linked directly.  At a low level, 
what we need
+# to do so is the following::
+# s = outer_se

[airflow] branch main updated: Fix bad delete logic for dagruns (#32684)

2023-07-19 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 7092cfdbbf Fix bad delete logic for dagruns (#32684)
7092cfdbbf is described below

commit 7092cfdbbfcfd3c03909229daa741a5bcd7ccc64
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Jul 19 13:27:56 2023 -0700

Fix bad delete logic for dagruns (#32684)

Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
---
 airflow/www/utils.py| 12 ++--
 tests/www/test_utils.py | 47 +--
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 3624fbe841..bcf368ea20 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -50,6 +50,7 @@ from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.json import WebEncoder
+from airflow.utils.sqlalchemy import tuple_in_condition
 from airflow.utils.state import State, TaskInstanceState
 from airflow.www.forms import DateTimeWithTimezoneField
 from airflow.www.widgets import AirflowDateTimePickerWidget
@@ -60,6 +61,8 @@ if TYPE_CHECKING:
 
 from airflow.www.fab_security.sqla.manager import SecurityManager
 
+TI = TaskInstance
+
 
 def datetime_to_string(value: DateTime | None) -> str | None:
 if value is None:
@@ -844,12 +847,17 @@ class DagRunCustomSQLAInterface(CustomSQLAInterface):
 """
 
 def delete(self, item: Model, raise_exception: bool = False) -> bool:
-self.session.execute(delete(TaskInstance).where(TaskInstance.run_id == 
item.run_id))
+self.session.execute(delete(TI).where(TI.dag_id == item.dag_id, 
TI.run_id == item.run_id))
 return super().delete(item, raise_exception=raise_exception)
 
 def delete_all(self, items: list[Model]) -> bool:
 self.session.execute(
-delete(TaskInstance).where(TaskInstance.run_id.in_(item.run_id for 
item in items))
+delete(TI).where(
+tuple_in_condition(
+(TI.dag_id, TI.run_id),
+((x.dag_id, x.run_id) for x in items),
+)
+)
 )
 return super().delete_all(items)
 
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index 12fe017c62..1dd1665a9b 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -17,17 +17,20 @@
 # under the License.
 from __future__ import annotations
 
+import itertools
 import re
 from datetime import datetime
 from unittest.mock import Mock
 from urllib.parse import parse_qs
 
+import pendulum
 from bs4 import BeautifulSoup
 from markupsafe import Markup
 
+from airflow.models import DagRun
 from airflow.utils import json as utils_json
 from airflow.www import utils
-from airflow.www.utils import json_f, wrapped_markdown
+from airflow.www.utils import DagRunCustomSQLAInterface, json_f, 
wrapped_markdown
 
 
 class TestUtils:
@@ -156,7 +159,6 @@ class TestUtils:
 assert "alert(1)" not in html
 
 def test_task_instance_link(self):
-
 from airflow.www.app import cached_app
 
 with cached_app(testing=True).test_request_context():
@@ -413,3 +415,44 @@ class TestWrappedMarkdown:
 """
 == rendered
 )
+
+
+def test_dag_run_custom_sqla_interface_delete_no_collateral_damage(dag_maker, 
session):
+interface = DagRunCustomSQLAInterface(obj=DagRun, session=session)
+dag_ids = (f"test_dag_{x}" for x in range(1, 4))
+dates = (pendulum.datetime(2023, 1, x) for x in range(1, 4))
+for dag_id, date in itertools.product(dag_ids, dates):
+with dag_maker(dag_id=dag_id) as dag:
+dag.create_dagrun(execution_date=date, state="running", 
run_type="scheduled")
+dag_runs = session.query(DagRun).all()
+assert len(dag_runs) == 9
+assert len(set(x.run_id for x in dag_runs)) == 3
+run_id_for_single_delete = "scheduled__2023-01-01T00:00:00+00:00"
+# we have 3 runs with this same run_id
+assert len(list(x for x in dag_runs if x.run_id == 
run_id_for_single_delete)) == 3
+# each is a different dag
+
+# if we delete one, it shouldn't delete the others
+one_run = [x for x in dag_runs if x.run_id == run_id_for_single_delete][0]
+assert interface.delete(item=one_run) is True
+session.commit()
+dag_runs = session.query(DagRun).all()
+# we should have one fewer dag run now
+assert len(dag_runs) == 8
+
+# now let's try multi delete
+run_id_for_multi_delete = "scheduled__2023-01-02T00:00:00+00:00"
+# verify we have 3
+runs_of_interest = [x for x i

[airflow] branch main updated: Allow setup without teardown (#32679)

2023-07-19 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new a2eaca8977 Allow setup without teardown (#32679)
a2eaca8977 is described below

commit a2eaca897734ac43d5263832b319c99b3510b7b5
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Wed Jul 19 11:54:58 2023 -0700

Allow setup without teardown (#32679)
---
 airflow/models/abstractoperator.py |  7 +++-
 airflow/models/dag.py  |  4 --
 tests/models/test_dag.py   | 86 +++---
 3 files changed, 29 insertions(+), 68 deletions(-)

diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 9a8f88ce7d..0c6d89fff3 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -302,7 +302,8 @@ class AbstractOperator(Templater, DAGNode):
 This method is meant to be used when we are clearing the task 
(non-upstream) and we need
 to add in the *relevant* setups and their teardowns.
 
-Relevant in this case means, the setup has a teardown that is 
downstream of ``self``.
+Relevant in this case means, the setup has a teardown that is 
downstream of ``self``,
+or the setup has no teardowns.
 """
 downstream_teardown_ids = {
 x.task_id for x in self.get_flat_relatives(upstream=False) if 
x.is_teardown
@@ -310,7 +311,9 @@ class AbstractOperator(Templater, DAGNode):
 for task in self.get_flat_relatives(upstream=True):
 if not task.is_setup:
 continue
-if not 
task.downstream_task_ids.isdisjoint(downstream_teardown_ids):
+has_no_teardowns = not any(True for x in task.downstream_list if 
x.is_teardown)
+# if task has no teardowns or has teardowns downstream of self
+if has_no_teardowns or 
task.downstream_task_ids.intersection(downstream_teardown_ids):
 yield task
 for t in task.downstream_list:
 if t.is_teardown and not t == self:
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 391be9e582..1d380274f6 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -714,10 +714,6 @@ class DAG(LoggingMixin):
 :meta private:
 """
 for task in self.tasks:
-if task.is_setup and not any(x.is_teardown for x in 
task.downstream_list):
-raise AirflowDagInconsistent(
-f"Dag has setup without teardown: dag='{self.dag_id}', 
task='{task.task_id}'"
-)
 if task.is_teardown and all(x.is_setup for x in 
task.upstream_list):
 raise AirflowDagInconsistent(
 f"Dag has teardown task without an upstream work task: 
dag='{self.dag_id}',"
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 14527fd7fa..494c7c7c66 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3623,39 +3623,46 @@ class TestTaskClearingSetupTeardownBehavior:
 
 def test_get_flat_relative_ids_with_setup(self):
 with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
-s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
+s1, w1, w2, w3, w4, t1 = self.make_tasks(dag, "s1, w1, w2, w3, w4, 
t1")
 
 s1 >> w1 >> w2 >> w3
 
-# there is no teardown downstream of w1, so we assume w1 does not need 
s1
-assert set(w1.get_upstreams_only_setups_and_teardowns()) == set()
+# w1 is downstream of s1, and s1 has no teardown, so clearing w1 
clears s1
+assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1}
 # same with w2 and w3
-assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
-assert set(w3.get_upstreams_only_setups_and_teardowns()) == set()
-assert self.cleared_downstream(w2) == {w2, w3}
+assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
+assert set(w3.get_upstreams_only_setups_and_teardowns()) == {s1}
+# so if we clear w2, we should also get s1, and w3, but not w1
+assert self.cleared_downstream(w2) == {s1, w2, w3}
 
 w3 >> t1
 
 # now, w2 has a downstream teardown, but it's not connected directly 
to s1
-# (this is how we signal "this is the teardown for this setup")
-# so still, we don't regard s1 as a setup for w2
-assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
-assert self.cleared_downstream(w2) == {w2, w3, t1}
+assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
+# so if we clear downstream then s1 will

[airflow] branch main updated: Speed up calculation of leaves and roots for task groups (#32592)

2023-07-14 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new fad297c900 Speed up calculation of leaves and roots for task groups 
(#32592)
fad297c900 is described below

commit fad297c900551301c1dcb33e9128959e18fe737e
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Jul 14 10:50:52 2023 -0700

Speed up calculation of leaves and roots for task groups (#32592)

Previously, every call to has_task would iterate the group.  Also, using 
set operation is faster than `any`.

Co-authored-by: Tzu-ping Chung 
---
 airflow/utils/task_group.py | 20 
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index e55a4abbe1..2b117ca7da 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -362,8 +362,10 @@ class TaskGroup(DAGNode):
 Returns a generator of tasks that are root tasks, i.e. those with no 
upstream
 dependencies within the TaskGroup.
 """
-for task in self:
-if not any(self.has_task(parent) for parent in 
task.get_direct_relatives(upstream=True)):
+tasks = list(self)
+ids = {x.task_id for x in tasks}
+for task in tasks:
+if not task.upstream_task_ids.intersection(ids):
 yield task
 
 def get_leaves(self) -> Generator[BaseOperator, None, None]:
@@ -371,22 +373,24 @@ class TaskGroup(DAGNode):
 Returns a generator of tasks that are leaf tasks, i.e. those with no 
downstream
 dependencies within the TaskGroup.
 """
+tasks = list(self)
+ids = {x.task_id for x in tasks}
 
-def recurse_for_first_non_setup_teardown(group, task):
+def recurse_for_first_non_setup_teardown(task):
 for upstream_task in task.upstream_list:
-if not group.has_task(upstream_task):
+if upstream_task.task_id not in ids:
 continue
 if upstream_task.is_setup or upstream_task.is_teardown:
-yield from recurse_for_first_non_setup_teardown(group, 
upstream_task)
+yield from 
recurse_for_first_non_setup_teardown(upstream_task)
 else:
 yield upstream_task
 
-for task in self:
-if not any(self.has_task(x) for x in 
task.get_direct_relatives(upstream=False)):
+for task in tasks:
+if not task.downstream_task_ids.intersection(ids):
 if not (task.is_teardown or task.is_setup):
 yield task
 else:
-yield from recurse_for_first_non_setup_teardown(self, task)
+yield from recurse_for_first_non_setup_teardown(task)
 
 def child_id(self, label):
 """



[airflow] branch main updated: Fix short circuit operator re teardowns (#32538)

2023-07-13 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new bb5307850d Fix short circuit operator re teardowns (#32538)
bb5307850d is described below

commit bb5307850d93df16f442bfce2f388c90d63c6f9a
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Jul 13 17:52:27 2023 -0700

Fix short circuit operator re teardowns (#32538)

Short circuit operator should not skip teardown tasks.  Note that if 
there's a setup downstream that is skipped from this, sometimes the teardown 
will end up skipped by the scheduler in accordance with trigger rules, but 
that's handled by the scheduler.

Also, did a little optimization so that most of the time we don't need to 
create an intermediate list of to-be-skipped task in this operator's execute 
(only when debugging do we need to "materialize" the list first).  Also 
consolidated logic so that we only have one call to `skip` instead of two 
different ones, one in each side of an if / else.
---
 airflow/models/baseoperator.py |   5 +-
 airflow/operators/python.py|  46 -
 tests/operators/test_python.py | 142 -
 3 files changed, 173 insertions(+), 20 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index ccbcb12efb..40fe3b8f0b 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -75,7 +75,7 @@ from airflow.models.mappedoperator import OperatorPartial, 
validate_mapping_kwar
 from airflow.models.param import ParamsDict
 from airflow.models.pool import Pool
 from airflow.models.taskinstance import TaskInstance, clear_task_instances
-from airflow.models.taskmixin import DAGNode, DependencyMixin
+from airflow.models.taskmixin import DependencyMixin
 from airflow.serialization.enums import DagAttributeTypes
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
@@ -100,6 +100,7 @@ if TYPE_CHECKING:
 import jinja2  # Slow import.
 
 from airflow.models.dag import DAG
+from airflow.models.operator import Operator
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.models.xcom_arg import XComArg
 from airflow.utils.task_group import TaskGroup
@@ -1373,7 +1374,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 self.log.info("Rendering template for %s", field)
 self.log.info(content)
 
-def get_direct_relatives(self, upstream: bool = False) -> 
Iterable[DAGNode]:
+def get_direct_relatives(self, upstream: bool = False) -> 
Iterable[Operator]:
 """
 Get list of the direct relatives to the current task, upstream or
 downstream.
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index a0ea5be80c..2db5ca69c3 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import inspect
+import logging
 import os
 import pickle
 import shutil
@@ -30,7 +31,7 @@ from collections.abc import Container
 from pathlib import Path
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Any, Callable, Collection, Iterable, Mapping, Sequence
+from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, 
Mapping, Sequence, cast
 
 import dill
 
@@ -49,6 +50,9 @@ from airflow.utils.operator_helpers import KeywordParameters
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, 
write_python_script
 
+if TYPE_CHECKING:
+from pendulum.datetime import DateTime
+
 
 def task(python_callable: Callable | None = None, multiple_outputs: bool | 
None = None, **kwargs):
 """Deprecated. Use :func:`airflow.decorators.task` instead.
@@ -251,27 +255,35 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
 self.log.info("Proceeding with downstream tasks...")
 return condition
 
-downstream_tasks = context["task"].get_flat_relatives(upstream=False)
-self.log.debug("Downstream task IDs %s", downstream_tasks)
+if not self.downstream_task_ids:
+self.log.info("No downstream tasks; nothing to do.")
+return
 
-if downstream_tasks:
-dag_run = context["dag_run"]
-execution_date = dag_run.execution_date
+dag_run = context["dag_run"]
 
+def get_tasks_to_skip():
 if self.ignore_downstream_trigger_rules is True:
-self.log.info("Skipping all downstream tasks...&quo

[airflow] branch main updated: Fix tests and add logic to handle clearing setup directly (#32430)

2023-07-07 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 566bc1b68b Fix tests and add logic to handle clearing setup directly 
(#32430)
566bc1b68b is described below

commit 566bc1b68b4e1643761b4e8518e5e556b8e6e82c
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Jul 7 15:52:14 2023 -0700

Fix tests and add logic to handle clearing setup directly (#32430)
---
 airflow/models/dag.py|  2 ++
 tests/models/test_dag.py | 77 +++-
 2 files changed, 72 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1ec78b64c3..234db54b15 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2373,6 +2373,8 @@ class DAG(LoggingMixin):
 also_include.extend(t.get_upstreams_follow_setups())
 else:
 
also_include.extend(t.get_upstreams_only_setups_and_teardowns())
+if t.is_setup and not include_downstream:
+also_include.extend(x for x in t.downstream_list if 
x.is_teardown)
 
 direct_upstreams: list[Operator] = []
 if include_direct_upstream:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index bd460e6cc7..e4a3290a5d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3590,12 +3590,23 @@ class TestTaskClearingSetupTeardownBehavior:
 upstream = True
 return set(
 task.dag.partial_subset(
-task_ids_or_regex=[task.task_id],
+task_ids_or_regex=task.task_id,
 include_downstream=not upstream,
 include_upstream=upstream,
 ).tasks
 )
 
+@staticmethod
+def cleared_neither(task):
+"""Helper to return tasks that would be cleared if **upstream** 
selected."""
+return set(
+task.dag.partial_subset(
+task_ids_or_regex=[task.task_id],
+include_downstream=False,
+include_upstream=False,
+).tasks
+)
+
 def test_get_flat_relative_ids_with_setup(self):
 with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
 s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
@@ -3823,18 +3834,70 @@ class TestTaskClearingSetupTeardownBehavior:
 """
 with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
 s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
-s1 >> w1 >> t1
+s1 >> w1 >> t1.as_teardown(setups=s1)
 s1 >> w2
-self.cleared_upstream(w2) == {s1, w2, t1}
+# w2 is downstream of s1, so when clearing upstream, it should 
clear s1 (since it
+# is upstream of w2) and t1 since it's the teardown for s1 even 
though not downstream of w1
+assert self.cleared_upstream(w2) == {s1, w2, t1}
 
-def clearing_teardown_no_clear_setup(self):
+def test_clearing_teardown_no_clear_setup(self):
 with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
 s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
 s1 >> t1
 # clearing t1 does not clear s1
-self.cleared_downstream(t1) == {t1}
+assert self.cleared_downstream(t1) == {t1}
 s1 >> w1 >> t1
 # that isn't changed with the introduction of w1
-self.cleared_downstream(t1) == {t1}
+assert self.cleared_downstream(t1) == {t1}
 # though, of course, clearing w1 clears them all
-self.cleared_downstream(w1) == {s1, w1, t1}
+assert self.cleared_downstream(w1) == {s1, w1, t1}
+
+def test_clearing_setup_clears_teardown(self):
+with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
+s1 >> t1
+s1 >> w1 >> t1
+# clearing w1 clears all always
+assert self.cleared_upstream(w1) == {s1, w1, t1}
+assert self.cleared_downstream(w1) == {s1, w1, t1}
+assert self.cleared_neither(w1) == {s1, w1, t1}
+# clearing s1 clears t1 always
+assert self.cleared_upstream(s1) == {s1, t1}
+assert self.cleared_downstream(s1) == {s1, w1, t1}
+assert self.cleared_neither(s1) == {s1, t1}
+
+@pytest.mark.parametrize(
+"upstream, downstream, expected",
+[
+(False, False, {"my_teardown", "my_setup&q

[airflow] branch main updated (e781aef1a7 -> 8d2ced5e85)

2023-06-29 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from e781aef1a7 aws waiter util: log status info with error level on waiter 
error (#32247)
 add 8d2ced5e85 Add setup / teardown info to graph_data endpoint (#32268)

No new revisions were added by this update.

Summary of changes:
 airflow/utils/task_group.py  | 16 +++-
 airflow/www/static/js/types/index.ts |  1 +
 2 files changed, 12 insertions(+), 5 deletions(-)



[airflow] branch main updated (05b39cf2ad -> 7b193970f8)

2023-06-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 05b39cf2ad Ignore teardowns and setups when arrowing from groups 
(#32157)
 add 7b193970f8 Check that chain_linear elements has len greater than 1 
(#32061)

No new revisions were added by this update.

Summary of changes:
 airflow/models/baseoperator.py| 7 +++
 tests/models/test_baseoperator.py | 7 +++
 2 files changed, 14 insertions(+)



[airflow] branch main updated: Ignore teardowns and setups when arrowing from groups (#32157)

2023-06-27 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 05b39cf2ad Ignore teardowns and setups when arrowing from groups 
(#32157)
05b39cf2ad is described below

commit 05b39cf2adb2998c01ff27057aeda585b4320d00
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Jun 27 15:07:18 2023 -0700

Ignore teardowns and setups when arrowing from groups (#32157)
---
 airflow/utils/task_group.py| 17 +++--
 tests/models/test_dag.py   | 21 ++---
 tests/utils/test_task_group.py | 16 +++-
 3 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 4d79d74e24..f20a5032a8 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -365,9 +365,22 @@ class TaskGroup(DAGNode):
 Returns a generator of tasks that are leaf tasks, i.e. those with no 
downstream
 dependencies within the TaskGroup.
 """
+
+def recurse_for_first_non_setup_teardown(group, task):
+for upstream_task in task.upstream_list:
+if not group.has_task(upstream_task):
+continue
+if upstream_task.is_setup or upstream_task.is_teardown:
+yield from recurse_for_first_non_setup_teardown(group, 
upstream_task)
+else:
+yield upstream_task
+
 for task in self:
-if not any(self.has_task(child) for child in 
task.get_direct_relatives(upstream=False)):
-yield task
+if not any(self.has_task(x) for x in 
task.get_direct_relatives(upstream=False)):
+if not (task.is_teardown or task.is_setup):
+yield task
+else:
+yield from recurse_for_first_non_setup_teardown(self, task)
 
 def child_id(self, label):
 """
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9eec01c44a..bd460e6cc7 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3772,13 +3772,20 @@ class TestTaskClearingSetupTeardownBehavior:
 g2_w3 = dag.task_dict["g2.w3"]
 g2_group_teardown = dag.task_dict["g2.group_teardown"]
 
-with pytest.raises(Exception):
-# fix_me
-#   the line `dag_setup >> tg >> dag_teardown` should be 
equivalent to
-#   dag_setup >> group_setup; w3 >> dag_teardown
-#   i.e. not group_teardown >> dag_teardown
-assert g2_group_teardown.downstream_task_ids == {}
-assert g2_w3.downstream_task_ids == {"g2.group_teardown", 
"dag_teardown"}
+# the line `dag_setup >> tg >> dag_teardown` should be equivalent to
+# dag_setup >> group_setup; w3 >> dag_teardown
+# i.e. not group_teardown >> dag_teardown
+# this way the two teardowns can run in parallel
+# so first, check that dag_teardown not downstream of group 2 teardown
+# this means they can run in parallel
+assert "dag_teardown" not in g2_group_teardown.downstream_task_ids
+# and just document that g2 teardown is in effect a dag leaf
+assert g2_group_teardown.downstream_task_ids == set()
+# group 2 task w3 is in the scope of 2 teardowns -- the dag teardown 
and the group teardown
+# it is arrowed to both of them
+assert g2_w3.downstream_task_ids == {"g2.group_teardown", 
"dag_teardown"}
+# dag teardown should have 3 upstreams: the last work task in groups 1 
and 2, and its setup
+assert dag_teardown.upstream_task_ids == {"g1.w3", "g2.w3", 
"dag_setup"}
 
 assert {x.task_id for x in 
g2_w2.get_upstreams_only_setups_and_teardowns()} == {
 "dag_setup",
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index a5262aa2eb..e7088afbd0 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -24,7 +24,8 @@ import pytest
 
 from airflow.decorators import dag, task_group as task_group_decorator
 from airflow.exceptions import TaskAlreadyInTaskGroup
-from airflow.models import DAG
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.dag import DAG
 from airflow.models.xcom_arg import XComArg
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
@@ -1381,3 +1382,16 @@ def 
test_override_dag_default_args_in_multi_level_nested_tg():
 assert task.retries == 1
 assert task.owner == "z"
 assert task.execution_timeou

[airflow] branch main updated: Change `as_setup` and `as_teardown` to instance methods (#32053)

2023-06-26 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 016ce99486 Change `as_setup` and `as_teardown` to instance methods 
(#32053)
016ce99486 is described below

commit 016ce9948625a556093b0182439aa50314c651da
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Mon Jun 26 10:32:25 2023 -0700

Change `as_setup` and `as_teardown` to instance methods (#32053)

This provides a number of benefits.
* provides a oneline syntax for setting setup / teardown deps
* makes it easy to convert dags to use feature
* provides a mechanism to combine "reusable" taskflow tasks with setup / 
teardown
* set setup and teardown in the same place you set deps

-

Co-authored-by: Ephraim Anierobi 
---
 airflow/example_dags/example_setup_teardown.py |  16 +-
 .../example_setup_teardown_taskflow.py |  65 --
 airflow/models/abstractoperator.py |  94 +++-
 airflow/models/baseoperator.py |  35 ---
 airflow/models/mappedoperator.py   |  26 ++-
 airflow/models/taskmixin.py|  15 ++
 airflow/models/xcom_arg.py |  53 -
 airflow/serialization/serialized_objects.py|   3 +-
 tests/decorators/test_setup_teardown.py|  63 +++---
 tests/models/test_dag.py   |  16 +-
 tests/models/test_taskinstance.py  |   2 +-
 tests/models/test_taskmixin.py | 248 +
 tests/serialization/test_dag_serialization.py  |  16 +-
 tests/ti_deps/deps/test_trigger_rule_dep.py|   2 +-
 14 files changed, 534 insertions(+), 120 deletions(-)

diff --git a/airflow/example_dags/example_setup_teardown.py 
b/airflow/example_dags/example_setup_teardown.py
index 77d7d5bdc6..59aba9753a 100644
--- a/airflow/example_dags/example_setup_teardown.py
+++ b/airflow/example_dags/example_setup_teardown.py
@@ -30,21 +30,19 @@ with DAG(
 catchup=False,
 tags=["example"],
 ) as dag:
-root_setup = BashOperator.as_setup(task_id="root_setup", 
bash_command="echo 'Hello from root_setup'")
+root_setup = BashOperator(task_id="root_setup", bash_command="echo 'Hello 
from root_setup'").as_setup()
 root_normal = BashOperator(task_id="normal", bash_command="echo 'I am just 
a normal task'")
-root_teardown = BashOperator.as_teardown(
+root_teardown = BashOperator(
 task_id="root_teardown", bash_command="echo 'Goodbye from 
root_teardown'"
-)
+).as_teardown(setups=root_setup)
 root_setup >> root_normal >> root_teardown
-root_setup >> root_teardown
 with TaskGroup("section_1") as section_1:
-inner_setup = BashOperator.as_setup(
+inner_setup = BashOperator(
 task_id="taskgroup_setup", bash_command="echo 'Hello from 
taskgroup_setup'"
-)
+).as_setup()
 inner_normal = BashOperator(task_id="normal", bash_command="echo 'I am 
just a normal task'")
-inner_teardown = BashOperator.as_teardown(
+inner_teardown = BashOperator(
 task_id="taskgroup_teardown", bash_command="echo 'Hello from 
taskgroup_teardown'"
-)
+).as_teardown(setups=inner_setup)
 inner_setup >> inner_normal >> inner_teardown
-inner_setup >> inner_teardown
 root_normal >> section_1
diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py 
b/airflow/example_dags/example_setup_teardown_taskflow.py
index 245cc6a2e9..128534f1d2 100644
--- a/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -29,30 +29,61 @@ with DAG(
 catchup=False,
 tags=["example"],
 ) as dag:
-# You can use the setup and teardown decorators to add setup and teardown 
tasks at the DAG level
-@setup
+
 @task
-def root_setup():
-print("Hello from root_setup")
+def task_1():
+print("Hello 1")
 
-@teardown
 @task
-def root_teardown():
-print("Goodbye from root_teardown")
+def task_2():
+print("Hello 2")
+
+@task
+def task_3():
+print("Hello 3")
+
+# you can set setup / teardown relationships with the `as_teardown` method.
+t1 = task_1()
+t2 = task_2()
+t3 = task_3()
+t1 >> t2 >> t3.as_teardown(setups=t1)
+
+# the method `as_teadrown` will mark t3 as teardown, t1 as setup, and 
arrow t1 >> t3
+# now if you clear t2 (downstrea

[airflow] branch main updated: Call setup / teardown validation in dagbag load (#32062)

2023-06-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new b99f1b1b2c Call setup / teardown validation in dagbag load (#32062)
b99f1b1b2c is described below

commit b99f1b1b2c9fe166a8b2c080c473b02c89d0c2b8
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Jun 22 09:36:24 2023 -0700

Call setup / teardown validation in dagbag load (#32062)
---
 .../example_setup_teardown_taskflow.py | 13 ++-
 airflow/models/dag.py  |  1 +
 tests/models/test_dag.py   | 26 ++
 3 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py 
b/airflow/example_dags/example_setup_teardown_taskflow.py
index 7e47475d16..245cc6a2e9 100644
--- a/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -61,10 +61,13 @@ with DAG(
 def hello():
 print("I say hello")
 
-my_setup()
-hello()
-my_teardown()
+s = my_setup()
+w = hello()
+t = my_teardown()
+s >> w >> t
+s >> t
 
-root_setup()
+rs = root_setup()
 normal() >> section_1()
-root_teardown()
+rt = root_teardown()
+rs >> rt
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index f0c513e41c..906ae989e2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -685,6 +685,7 @@ class DAG(LoggingMixin):
 )
 self.params.validate()
 self.timetable.validate()
+self.validate_setup_teardown()
 
 def validate_setup_teardown(self):
 """
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9d510fa428..fa2bcd7052 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3805,3 +3805,29 @@ class TestTaskClearingSetupTeardownBehavior:
 "g2.w1",
 "g2.w2",
 }
+
+def test_clear_upstream_not_your_setup(self):
+"""
+When you have a work task that comes after a setup, then if you clear 
upstream
+the setup (and its teardown) will be cleared even though strictly 
speaking you don't
+"require" it since, depending on speed of execution, it might be torn 
down by t1
+before / while w2 runs.  It just gets cleared by virtue of it being 
upstream, and
+that's what you requested.  And it's teardown gets cleared too.  But 
w1 doesn't.
+"""
+with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+s1 >> w1 >> t1
+s1 >> w2
+self.cleared_upstream(w2) == {s1, w2, t1}
+
+def clearing_teardown_no_clear_setup(self):
+with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
+s1 >> t1
+# clearing t1 does not clear s1
+self.cleared_downstream(t1) == {t1}
+s1 >> w1 >> t1
+# that isn't changed with the introduction of w1
+self.cleared_downstream(t1) == {t1}
+# though, of course, clearing w1 clears them all
+self.cleared_downstream(w1) == {s1, w1, t1}



[airflow] branch main updated: Mark celery redis integration test as flaky (#32078)

2023-06-22 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new fa81c3ad4c Mark celery redis integration test as flaky (#32078)
fa81c3ad4c is described below

commit fa81c3ad4c7ed83ee3aa6375dcbf3235f3663287
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Jun 22 09:33:26 2023 -0700

Mark celery redis integration test as flaky (#32078)
---
 tests/integration/executors/test_celery_executor.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tests/integration/executors/test_celery_executor.py 
b/tests/integration/executors/test_celery_executor.py
index 6923bb73fd..4be7633c3e 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -101,6 +101,7 @@ class TestCeleryExecutor:
 db.clear_db_runs()
 db.clear_db_jobs()
 
+@pytest.mark.flaky(reruns=3)
 @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
 def test_celery_integration(self, broker_url):
 success_command = ["airflow", "tasks", "run", "true", "some_parameter"]



[airflow] branch main updated: Add version of `chain` which doesn't require matched lists (#31927)

2023-06-20 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 8f65aee8f3 Add version of `chain` which doesn't require matched lists 
(#31927)
8f65aee8f3 is described below

commit 8f65aee8f3c4dfdd6c4195d97f57f4267d37c209
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Jun 20 22:26:59 2023 -0700

Add version of `chain` which doesn't require matched lists (#31927)

Co-authored-by: Tzu-ping Chung 
---
 airflow/models/baseoperator.py| 32 
 tests/models/test_baseoperator.py | 51 ++-
 2 files changed, 82 insertions(+), 1 deletion(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 84aa9a17eb..79d4637387 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -85,6 +85,7 @@ from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
 from airflow.utils.context import Context
 from airflow.utils.decorators import fixup_decorator_warning_stack
+from airflow.utils.edgemodifier import EdgeModifier
 from airflow.utils.helpers import validate_key
 from airflow.utils.operator_resources import Resources
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -1838,6 +1839,37 @@ def cross_downstream(
 task.set_downstream(to_tasks)
 
 
+def chain_linear(*elements: DependencyMixin | Sequence[DependencyMixin]):
+"""
+Helper to simplify task dependency definition.
+
+E.g.: suppose you want precedence like so::
+
+╭─op2─╮ ╭─op4─╮
+op1─┤ ├─├─op5─┤─op7
+╰-op3─╯ ╰-op6─╯
+
+Then you can accomplish like so::
+
+chain_linear(
+op1,
+[op2, op3],
+[op4, op5, op6],
+op7
+)
+
+:param elements: a list of operators / lists of operators
+"""
+prev_elem = None
+for curr_elem in elements:
+if isinstance(curr_elem, EdgeModifier):
+raise ValueError("Labels are not supported by chain_linear")
+if prev_elem is not None:
+for task in prev_elem:
+task >> curr_elem
+prev_elem = [curr_elem] if isinstance(curr_elem, DependencyMixin) else 
curr_elem
+
+
 # pyupgrade assumes all type annotations can be lazily evaluated, but this is
 # not the case for attrs-decorated classes, since cattrs needs to evaluate the
 # annotation expressions at runtime, and Python before 3.9.0 does not lazily
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index 418ffdba72..82e9dd71c3 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -31,7 +31,13 @@ from airflow.decorators import task as task_decorator
 from airflow.exceptions import AirflowException, DagInvalidTriggerRule, 
RemovedInAirflow3Warning
 from airflow.lineage.entities import File
 from airflow.models import DAG
-from airflow.models.baseoperator import BaseOperator, BaseOperatorMeta, chain, 
cross_downstream
+from airflow.models.baseoperator import (
+BaseOperator,
+BaseOperatorMeta,
+chain,
+chain_linear,
+cross_downstream,
+)
 from airflow.utils.context import Context
 from airflow.utils.edgemodifier import Label
 from airflow.utils.task_group import TaskGroup
@@ -515,6 +521,49 @@ class TestBaseOperator:
 assert [op2] == tgop3.get_direct_relatives(upstream=False)
 assert [op2] == tgop4.get_direct_relatives(upstream=False)
 
+def test_chain_linear(self):
+dag = DAG(dag_id="test_chain_linear", start_date=datetime.now())
+
+t1, t2, t3, t4, t5, t6, t7 = (BaseOperator(task_id=f"t{i}", dag=dag) 
for i in range(1, 8))
+chain_linear(t1, [t2, t3, t4], [t5, t6], t7)
+
+assert set(t1.get_direct_relatives(upstream=False)) == {t2, t3, t4}
+assert set(t2.get_direct_relatives(upstream=False)) == {t5, t6}
+assert set(t3.get_direct_relatives(upstream=False)) == {t5, t6}
+assert set(t7.get_direct_relatives(upstream=True)) == {t5, t6}
+
+t1, t2, t3, t4, t5, t6 = (
+task_decorator(task_id=f"xcomarg_task{i}", python_callable=lambda: 
None, dag=dag)()
+for i in range(1, 7)
+)
+chain_linear(t1, [t2, t3], [t4, t5], t6)
+
+assert set(t1.operator.get_direct_relatives(upstream=False)) == 
{t2.operator, t3.operator}
+assert set(t2.operator.get_direct_relatives(upstream=False)) == 
{t4.operator, t5.operator}
+assert set(t3.operator.get_direct_relatives(upstream=False)) == 
{t4.operator, t5.operator}
+assert set(t6.operator.get_direct_relatives(upstream=True)) == 
{t4.operator, t5.operator}
+
+# Begin test

[airflow] branch main updated: Replace State usages with strong-typed enums (#31735)

2023-06-16 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new b459af3ee0 Replace State usages with strong-typed enums (#31735)
b459af3ee0 is described below

commit b459af3ee0f94cd246e3d401ca3eec18ffd85db0
Author: Tzu-ping Chung 
AuthorDate: Sat Jun 17 04:48:57 2023 +0800

Replace State usages with strong-typed enums (#31735)

Only in the main Airflow code base. There are many more in tests that I 
might tackle some day.
Additionally, there are some cases where TI state is used for "job" state.  
We may deal with this later by introducing a new type ExecutorState.
---
 airflow/api/common/delete_dag.py   |  4 +-
 airflow/api/common/mark_tasks.py   | 30 +---
 .../endpoints/task_instance_endpoint.py|  4 +-
 airflow/api_connexion/schemas/enum_schemas.py  |  4 +-
 airflow/dag_processing/processor.py| 16 +--
 airflow/executors/base_executor.py | 10 ++--
 airflow/executors/celery_executor.py   |  8 ++--
 airflow/executors/celery_executor_utils.py |  8 ++--
 airflow/executors/debug_executor.py| 28 +--
 airflow/executors/kubernetes_executor.py   | 24 ++
 airflow/executors/local_executor.py| 16 +++
 airflow/executors/sequential_executor.py   |  6 +--
 airflow/jobs/backfill_job_runner.py|  6 +--
 airflow/jobs/job.py|  7 ++-
 airflow/jobs/local_task_job_runner.py  |  6 +--
 airflow/jobs/scheduler_job_runner.py   | 28 +--
 airflow/listeners/spec/taskinstance.py |  6 +--
 airflow/models/dag.py  | 18 
 airflow/models/dagrun.py   | 42 -
 airflow/models/pool.py |  8 ++--
 airflow/models/skipmixin.py|  4 +-
 airflow/models/taskinstance.py | 54 +++---
 airflow/operators/subdag.py| 20 
 airflow/operators/trigger_dagrun.py|  6 +--
 airflow/sensors/external_task.py   | 28 +--
 airflow/sentry.py  |  4 +-
 airflow/ti_deps/dependencies_states.py | 30 ++--
 airflow/ti_deps/deps/dagrun_exists_dep.py  |  4 +-
 airflow/ti_deps/deps/not_in_retry_period_dep.py|  4 +-
 airflow/ti_deps/deps/not_previously_skipped_dep.py |  4 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py|  4 +-
 airflow/ti_deps/deps/ready_to_reschedule.py|  4 +-
 airflow/ti_deps/deps/task_not_running_dep.py   |  4 +-
 airflow/utils/dot_renderer.py  |  2 +-
 airflow/utils/log/file_task_handler.py |  5 +-
 airflow/utils/log/log_reader.py| 23 +
 airflow/utils/state.py |  9 +---
 airflow/www/utils.py   | 10 ++--
 airflow/www/views.py   | 26 +--
 39 files changed, 280 insertions(+), 244 deletions(-)

diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index 45611729ea..1d879a667a 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -29,7 +29,7 @@ from airflow.models import DagModel, TaskFail
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils.db import get_sqla_model_classes
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.state import State
+from airflow.utils.state import TaskInstanceState
 
 log = logging.getLogger(__name__)
 
@@ -50,7 +50,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, 
session: Session =
 running_tis = session.scalar(
 select(models.TaskInstance.state)
 .where(models.TaskInstance.dag_id == dag_id)
-.where(models.TaskInstance.state == State.RUNNING)
+.where(models.TaskInstance.state == TaskInstanceState.RUNNING)
 .limit(1)
 )
 if running_tis:
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index b965237bdf..184251b515 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -155,7 +155,7 @@ def set_state(
 for task_instance in tis_altered:
 # The try_number was decremented when setting to up_for_reschedule 
and deferred.
 # Increment it back when changing the state again
-if task_instance.state in [State.DEFERRED, 
State.UP_FOR_RESCHEDULE]:
+if task_instance.state in (TaskInstanceState.DEFERRED, 
TaskInstanceState.UP_FOR_RESCHEDULE):
 task_instance._try_n

[airflow] branch main updated: Upgrade ruff to 0.272 (#31966)

2023-06-16 Thread dstandish
This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new fc0e5a4d42 Upgrade ruff to 0.272 (#31966)
fc0e5a4d42 is described below

commit fc0e5a4d42ee882ca5bc20ea65be38b2c739644d
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Fri Jun 16 11:30:18 2023 -0700

Upgrade ruff to 0.272 (#31966)
---
 .pre-commit-config.yaml  |  4 ++--
 airflow/example_dags/example_sensor_decorator.py |  1 -
 airflow/example_dags/tutorial_taskflow_api.py|  1 -
 airflow/jobs/triggerer_job_runner.py | 10 +-
 airflow/models/dag.py|  3 +--
 airflow/models/dagbag.py |  1 -
 airflow/models/taskmixin.py  |  1 -
 airflow/utils/dag_cycle_tester.py|  4 ++--
 tests/jobs/test_scheduler_job.py |  6 +++---
 9 files changed, 13 insertions(+), 18 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0430658d17..c3447d2e3c 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -187,8 +187,8 @@ repos:
 pass_filenames: true
 # Since ruff makes use of multiple cores we _purposefully_ don't run 
this in docker so it can use the
 # host CPU to it's fullest
-entry: ruff --fix --no-update-check --force-exclude
-additional_dependencies: ['ruff==0.0.265']
+entry: ruff --fix --force-exclude
+additional_dependencies: ['ruff==0.0.272']
 files: \.pyi?$
 exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py
   - repo: https://github.com/asottile/blacken-docs
diff --git a/airflow/example_dags/example_sensor_decorator.py 
b/airflow/example_dags/example_sensor_decorator.py
index 62c5d94088..db3059398c 100644
--- a/airflow/example_dags/example_sensor_decorator.py
+++ b/airflow/example_dags/example_sensor_decorator.py
@@ -27,7 +27,6 @@ import pendulum
 from airflow.decorators import dag, task
 from airflow.sensors.base import PokeReturnValue
 
-
 # [END import_module]
 
 
diff --git a/airflow/example_dags/tutorial_taskflow_api.py 
b/airflow/example_dags/tutorial_taskflow_api.py
index 27a28f4b79..f41f729af8 100644
--- a/airflow/example_dags/tutorial_taskflow_api.py
+++ b/airflow/example_dags/tutorial_taskflow_api.py
@@ -25,7 +25,6 @@ import pendulum
 
 from airflow.decorators import dag, task
 
-
 # [END import_module]
 
 
diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 5f4b77cefa..8719f4ea95 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -28,7 +28,7 @@ from collections import deque
 from contextlib import suppress
 from copy import copy
 from queue import SimpleQueue
-from typing import TYPE_CHECKING, Deque
+from typing import TYPE_CHECKING
 
 from sqlalchemy import func
 
@@ -429,16 +429,16 @@ class TriggerRunner(threading.Thread, LoggingMixin):
 trigger_cache: dict[str, type[BaseTrigger]]
 
 # Inbound queue of new triggers
-to_create: Deque[tuple[int, BaseTrigger]]
+to_create: deque[tuple[int, BaseTrigger]]
 
 # Inbound queue of deleted triggers
-to_cancel: Deque[int]
+to_cancel: deque[int]
 
 # Outbound queue of events
-events: Deque[tuple[int, TriggerEvent]]
+events: deque[tuple[int, TriggerEvent]]
 
 # Outbound queue of failed triggers
-failed_triggers: Deque[tuple[int, BaseException]]
+failed_triggers: deque[tuple[int, BaseException]]
 
 # Should-we-stop flag
 stop: bool = False
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index a816c9548b..b7361d6f3b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -39,7 +39,6 @@ from typing import (
 Any,
 Callable,
 Collection,
-Deque,
 Iterable,
 Iterator,
 List,
@@ -3784,7 +3783,7 @@ class DagContext:
 
 """
 
-_context_managed_dags: Deque[DAG] = deque()
+_context_managed_dags: collections.deque[DAG] = deque()
 autoregistered_dags: set[tuple[DAG, ModuleType]] = set()
 current_autoregister_module_name: str | None = None
 
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 02055b5e94..b422d4699e 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -101,7 +101,6 @@ class DagBag(LoggingMixin):
 collect_dags: bool = True,
 ):
 # Avoid circular import
-from airflow.models.dag import DAG
 
 super().__init__()
 
diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index a858ce942f..0c1c94b7b8 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -174,7 +174,6 @@ class DAGNode(DependencyMixin, metaclass=ABCMeta):
 """Sets relatives for the

  1   2   3   4   >