Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1848746052 ## airflow/api_connexion/schemas/task_instance_schema.py: ## @@ -212,8 +210,8 @@ class SetTaskInstanceStateFormSchema(Schema): @validates_schema def validate_form(self, data, **kwargs): """Validate set task instance state form.""" -if not exactly_one(data.get("logical_date"), data.get("dag_run_id")): -raise ValidationError("Exactly one of logical_date or dag_run_id must be provided") +if not data.get("dag_run_id"): +raise ValidationError("dag_run_id must be provided") Review Comment: We don’t need this if we set `dag_run_id = fields.Str(required=True)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1848815726 ## airflow/api_connexion/schemas/task_instance_schema.py: ## @@ -212,8 +210,8 @@ class SetTaskInstanceStateFormSchema(Schema): @validates_schema def validate_form(self, data, **kwargs): """Validate set task instance state form.""" -if not exactly_one(data.get("logical_date"), data.get("dag_run_id")): -raise ValidationError("Exactly one of logical_date or dag_run_id must be provided") +if not data.get("dag_run_id"): +raise ValidationError("dag_run_id must be provided") Review Comment: Thanks!. Changed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1848530608 ## airflow/ti_deps/deps/exec_date_after_start_date_dep.py: ## @@ -24,23 +24,23 @@ class ExecDateAfterStartDateDep(BaseTIDep): Review Comment: part of other PR ## providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py: ## @@ -38,9 +38,14 @@ from kubernetes.dynamic import DynamicClient from sqlalchemy import or_, select, update +try: +from airflow.cli.cli_config import ARG_LOGICAL_DATE +except ImportError: # 2.x compatibility. +from airflow.cli.cli_config import ( # type: ignore[attr-defined, no-redef] +ARG_EXECUTION_DATE as ARG_LOGICAL_DATE, +) Review Comment: Part of other PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1848529847 ## airflow/cli/commands/task_command.py: ## @@ -112,49 +147,48 @@ def _get_dag_run( the logical date; otherwise use it as a run ID and set the logical date to the current time. """ -if not exec_date_or_run_id and not create_if_necessary: -raise ValueError("Must provide `exec_date_or_run_id` if not `create_if_necessary`.") -execution_date: pendulum.DateTime | None = None -if exec_date_or_run_id: -dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, run_id=exec_date_or_run_id, session=session) -if dag_run: -return dag_run, False -with suppress(ParserError, TypeError): -execution_date = timezone.parse(exec_date_or_run_id) -if execution_date: -dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, execution_date=execution_date, session=session) -if dag_run: +if not logical_date_or_run_id and not create_if_necessary: +raise ValueError("Must provide `logical_date_or_run_id` if not `create_if_necessary`.") + +logical_date = None +if logical_date_or_run_id: +dag_run, logical_date = _fetch_dag_run_from_run_id_or_logical_date_string( +dag_id=dag.dag_id, +value=logical_date_or_run_id, +session=session, +) +if dag_run is not None: return dag_run, False elif not create_if_necessary: raise DagRunNotFound( -f"DagRun for {dag.dag_id} with run_id or execution_date " -f"of {exec_date_or_run_id!r} not found" +f"DagRun for {dag.dag_id} with run_id or logical_date " +f"of {logical_date_or_run_id!r} not found" ) -if execution_date is not None: -dag_run_execution_date = execution_date +if logical_date is not None: +dag_run_logical_date = logical_date else: -dag_run_execution_date = pendulum.instance(timezone.utcnow()) +dag_run_logical_date = pendulum.instance(timezone.utcnow()) if create_if_necessary == "memory": dag_run = DagRun( dag_id=dag.dag_id, -run_id=exec_date_or_run_id, -execution_date=dag_run_execution_date, - data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date), +run_id=logical_date_or_run_id, +logical_date=dag_run_logical_date, + data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), triggered_by=DagRunTriggeredByType.CLI, ) return dag_run, True elif create_if_necessary == "db": dag_run = dag.create_dagrun( state=DagRunState.QUEUED, -execution_date=dag_run_execution_date, +logical_date=dag_run_logical_date, run_id=_generate_temporary_run_id(), - data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date), + data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), session=session, triggered_by=DagRunTriggeredByType.CLI, ) -return dag_run, True +return dag_run, True # type: ignore[return-value] Review Comment: removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1848528370 ## airflow/api_connexion/endpoints/task_instance_endpoint.py: ## @@ -522,19 +522,10 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION if not task: error_message = f"Task ID {task_id} not found" raise NotFound(error_message) - -logical_date = data.get("logical_date") Review Comment: removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1848077091 ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: Added it ## airflow/www/views.py: ## @@ -2144,7 +2146,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): form=form, ) -dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, logical_date=logical_date) +dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id) Review Comment: Changed it ## tests/sensors/test_external_task_sensor.py: ## @@ -1373,9 +1374,17 @@ def test_external_task_marker_clear_activate(dag_bag_parent_child, session): run_tasks(dag_bag, logical_date=day_1) run_tasks(dag_bag, logical_date=day_2) +from sqlalchemy import select + # Assert that dagruns of all the affected dags are set to SUCCESS before tasks are cleared. for dag, logical_date in itertools.product(dag_bag.dags.values(), [day_1, day_2]): -dagrun = dag.get_dagrun(logical_date=logical_date, session=session) +dagrun = dag.get_dagrun( +run_id=select(DagRun.run_id) +.where(DagRun.logical_date == logical_date) +.order_by(DagRun.id.desc()) +.limit(1), +session=session, +) Review Comment: Done for all of them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616824 ## providers/tests/google/common/hooks/test_base_google.py: ## Review Comment: Same, not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616575 ## providers/tests/google/cloud/operators/test_bigquery_dts.py: ## Review Comment: Changes in this file are not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
Lee-W commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847630397 ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: yep, this one is the only place, we did not use `run_id=` and session, probably just missed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847635422 ## tests/www/views/test_views.py: ## @@ -396,8 +396,8 @@ def get_task_instance(session, task): # task_2 remains as SUCCESS assert get_task_instance(session, task_2).state == State.SUCCESS # task_3 and task_4 are cleared because they were in FAILED/UPSTREAM_FAILED state -assert get_task_instance(session, task_3).state == State.NONE -assert get_task_instance(session, task_4).state == State.NONE +assert get_task_instance(session, task_3).state == State.UPSTREAM_FAILED +assert get_task_instance(session, task_4).state == State.FAILED Review Comment: These should not change, something is not right in the implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847631286 ## tests/models/test_dag.py: ## @@ -2671,8 +2663,8 @@ def get_ti_from_db(task): # task_2 remains as SUCCESS assert get_ti_from_db(task_2).state == State.SUCCESS # task_3 and task_4 are cleared because they were in FAILED/UPSTREAM_FAILED state -assert get_ti_from_db(task_3).state == State.NONE -assert get_ti_from_db(task_4).state == State.NONE +assert get_ti_from_db(task_3).state == State.UPSTREAM_FAILED Review Comment: Yeah these states should not change; something is wrong here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
Lee-W commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847630397 ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: yep, this one is the only place, we did not use `run_id=` ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: yep, this one is the only place, we did not use `run_id=`, probably just missed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616218 ## airflow/www/views.py: ## @@ -2144,7 +2146,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): form=form, ) -dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, logical_date=logical_date) +dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id) Review Comment: Same, it’d be a good idea to pass session in here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847615440 ## airflow/models/slamiss.py: ## Review Comment: This file should not be added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616027 ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: We don’t _need_ to, but should. Similar to the `dag.get_dagrun` call below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847615530 ## airflow/settings.py: ## @@ -831,6 +831,10 @@ def is_usage_data_collection_enabled() -> bool: ALLOW_FUTURE_LOGICAL_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) +ALLOW_TRIGGER_DAGRUN_IN_FUTURE = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) + +ALLOW_TRIGGER_DAGRUN_IN_FUTURE = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) + Review Comment: Rebase error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847608478 ## airflow/cli/commands/task_command.py: ## @@ -91,19 +91,54 @@ def _generate_temporary_run_id() -> str: return f"__airflow_temporary_run_{timezone.utcnow().isoformat()}__" +def _fetch_dag_run_from_run_id_or_logical_date_string( +*, +dag_id: str, +value: str, Review Comment: We use `logical_date_or_run_id` in many places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
uranusjr commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847608031 ## airflow/api_connexion/endpoints/task_instance_endpoint.py: ## @@ -522,19 +522,10 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION if not task: error_message = f"Task ID {task_id} not found" raise NotFound(error_message) - -logical_date = data.get("logical_date") Review Comment: `logical_date` should also be removed from `SetTaskInstanceStateFormSchema` (where this key came from) and the API spec. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
Lee-W commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1847584414 ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: ```suggestion run_id=select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) ``` ## tests/models/test_dag.py: ## @@ -2671,8 +2663,8 @@ def get_ti_from_db(task): # task_2 remains as SUCCESS assert get_ti_from_db(task_2).state == State.SUCCESS # task_3 and task_4 are cleared because they were in FAILED/UPSTREAM_FAILED state -assert get_ti_from_db(task_3).state == State.NONE -assert get_ti_from_db(task_4).state == State.NONE +assert get_ti_from_db(task_3).state == State.UPSTREAM_FAILED Review Comment: May I know why we're changing the expected values here? ## airflow/www/views.py: ## @@ -1417,7 +1417,9 @@ def rendered_templates(self, session): logger.info("Retrieving rendered templates.") dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id) -dag_run = dag.get_dagrun(logical_date=dttm) +dag_run = dag.get_dagrun( +select(DagRun.run_id).where(DagRun.logical_date == dttm).order_by(DagRun.id.desc()).limit(1) Review Comment: do we need to pass session here? ## tests/sensors/test_external_task_sensor.py: ## @@ -1373,9 +1374,17 @@ def test_external_task_marker_clear_activate(dag_bag_parent_child, session): run_tasks(dag_bag, logical_date=day_1) run_tasks(dag_bag, logical_date=day_2) +from sqlalchemy import select + # Assert that dagruns of all the affected dags are set to SUCCESS before tasks are cleared. for dag, logical_date in itertools.product(dag_bag.dags.values(), [day_1, day_2]): -dagrun = dag.get_dagrun(logical_date=logical_date, session=session) +dagrun = dag.get_dagrun( +run_id=select(DagRun.run_id) +.where(DagRun.logical_date == logical_date) +.order_by(DagRun.id.desc()) +.limit(1), +session=session, +) Review Comment: ```suggestion run_id = ( select(DagRun.run_id) .where(DagRun.logical_date == logical_date) .order_by(DagRun.id.desc()) .limit(1) ) dagrun = dag.get_dagrun( run_id=run_id, session=session, ) ``` split it into 2 variables might make it easier to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1846056467 ## airflow/models/dag.py: ## @@ -1419,24 +1399,8 @@ def clear( session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), +exclude_run_ids: frozenset[str] | None = frozenset(), ) -> int | Iterable[TaskInstance]: -""" Review Comment: Added it back -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on PR #42404: URL: https://github.com/apache/airflow/pull/42404#issuecomment-2469302913 The following PR: [https://github.com/apache/airflow/pull/43902](https://github.com/apache/airflow/pull/43902) renames the `execution_date` to `logical_date` across the codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
Lee-W commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1836006380 ## airflow/models/dag.py: ## @@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance): exit_stack.callback(lambda: secrets_backend_list.pop(0)) with exit_stack: -execution_date = execution_date or timezone.utcnow() +logical_date = logical_date or timezone.utcnow() self.validate() -self.log.debug("Clearing existing task instances for execution date %s", execution_date) +self.log.debug("Clearing existing task instances for execution date %s", logical_date) Review Comment: Sounds good 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835991045 ## airflow/www/views.py: ## @@ -1654,10 +1666,10 @@ def get_logs_with_metadata(self, session: Session = NEW_SESSION): # Convert string datetime into actual datetime try: -execution_date = timezone.parse(execution_date_str, strict=True) +logical_date = timezone.parse(logical_date_str, strict=True) except ValueError: error_message = ( -f"Given execution date {execution_date_str!r} could not be identified as a date. " +f"Given execution date {logical_date_str!r} could not be identified as a date. " Review Comment: Changed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835989800 ## airflow/models/slamiss.py: ## @@ -14,25 +14,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - Review Comment: Thanks. This file has to be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835990646 ## airflow/www/views.py: ## @@ -221,16 +221,16 @@ def get_safe_url(url): def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): """Get Execution Data, Base Date & Number of runs from a Request.""" -date_time = www_request.args.get("execution_date") +date_time = www_request.args.get("logical_date") run_id = www_request.args.get("run_id") -# First check run id, then check execution date, if not fall back on the latest dagrun +# First check run id, then check logical_date date, if not fall back on the latest dagrun Review Comment: Changed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835988764 ## airflow/models/log.py: ## @@ -73,7 +73,7 @@ def __init__( if task_instance: self.dag_id = task_instance.dag_id self.task_id = task_instance.task_id -if execution_date := getattr(task_instance, "execution_date", None): +if execution_date := getattr(task_instance, "logical_date", None): self.execution_date = execution_date Review Comment: Yes because in log we still use execution_date in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835988369 ## airflow/models/dag.py: ## @@ -2439,24 +2403,24 @@ def _get_or_create_dagrun( :param dag: DAG to be used to find run. :param conf: Configuration to pass to newly created run. :param start_date: Start date of new run. -:param execution_date: Logical date for finding an existing run. +:param logical_date: Logical date for finding an existing run. Review Comment: Sure. I will change them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835987661 ## airflow/models/dag.py: ## @@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance): exit_stack.callback(lambda: secrets_backend_list.pop(0)) with exit_stack: -execution_date = execution_date or timezone.utcnow() +logical_date = logical_date or timezone.utcnow() self.validate() -self.log.debug("Clearing existing task instances for execution date %s", execution_date) +self.log.debug("Clearing existing task instances for execution date %s", logical_date) Review Comment: This can be changed in next PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835987493 ## airflow/models/dag.py: ## @@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance): exit_stack.callback(lambda: secrets_backend_list.pop(0)) with exit_stack: -execution_date = execution_date or timezone.utcnow() +logical_date = logical_date or timezone.utcnow() self.validate() -self.log.debug("Clearing existing task instances for execution date %s", execution_date) +self.log.debug("Clearing existing task instances for execution date %s", logical_date) Review Comment: I have kept execution_date in log as of now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835986921 ## airflow/models/dag.py: ## @@ -1337,42 +1322,35 @@ def set_task_group_state( """ from airflow.api.common.mark_tasks import set_state -if not exactly_one(execution_date, run_id): -raise ValueError("Exactly one of execution_date or run_id must be provided") - tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = [] task_ids: list[str] = [] -if execution_date is None: -dag_run = session.scalars( -select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == self.dag_id) -).one() # Raises an error if not found -resolve_execution_date = dag_run.execution_date -else: -resolve_execution_date = execution_date - -end_date = resolve_execution_date if not future else None -start_date = resolve_execution_date if not past else None - task_group_dict = self.task_group.get_task_group_dict() task_group = task_group_dict.get(group_id) if task_group is None: raise ValueError("TaskGroup {group_id} could not be found") tasks_to_set_state = [task for task in task_group.iter_tasks() if isinstance(task, BaseOperator)] task_ids = [task.task_id for task in task_group.iter_tasks()] dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id) -if start_date is None and end_date is None: -dag_runs_query = dag_runs_query.where(DagRun.execution_date == start_date) -else: -if start_date is not None: -dag_runs_query = dag_runs_query.where(DagRun.execution_date >= start_date) -if end_date is not None: -dag_runs_query = dag_runs_query.where(DagRun.execution_date <= end_date) + +@cache +def get_logical_date() -> datetime: +stmt = select(DagRun.logical_date).where(DagRun.run_id == run_id, DagRun.dag_id == self.dag_id) +return session.scalars(stmt).one() # Raises an error if not found + +end_date = get_logical_date() if not future else None +start_date = get_logical_date() if not past else None Review Comment: Changed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835985937 ## airflow/api_connexion/schemas/dag_run_schema.py: ## @@ -109,9 +101,9 @@ def autogenerate(self, data, **kwargs): @post_dump def autofill(self, data, **kwargs): -"""Populate execution_date from logical_date for compatibility.""" +"""Populate logical_date from logical_date for compatibility.""" ret_data = {} -data["execution_date"] = data["logical_date"] +data["logical_date"] = data["logical_date"] Review Comment: Removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
sunank200 commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1835985394 ## airflow/api/common/mark_tasks.py: ## @@ -344,27 +318,15 @@ def set_dag_run_state_to_failed( Set for a specific execution date and its task instances to failed. :param dag: the DAG of which to alter state -:param execution_date: the execution date from which to start looking(deprecated) :param run_id: the DAG run_id to start looking from :param commit: commit DAG and tasks to be altered to the database :param session: database session :return: If commit is true, list of tasks that have been updated, otherwise list of tasks that will be updated -:raises: AssertionError if dag or execution_date is invalid +:raises: AssertionError if dag or logical_date is invalid Review Comment: No I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
Lee-W commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1832281554 ## airflow/models/dag.py: ## @@ -1337,42 +1322,35 @@ def set_task_group_state( """ from airflow.api.common.mark_tasks import set_state -if not exactly_one(execution_date, run_id): -raise ValueError("Exactly one of execution_date or run_id must be provided") - tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = [] task_ids: list[str] = [] -if execution_date is None: -dag_run = session.scalars( -select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == self.dag_id) -).one() # Raises an error if not found -resolve_execution_date = dag_run.execution_date -else: -resolve_execution_date = execution_date - -end_date = resolve_execution_date if not future else None -start_date = resolve_execution_date if not past else None - task_group_dict = self.task_group.get_task_group_dict() task_group = task_group_dict.get(group_id) if task_group is None: raise ValueError("TaskGroup {group_id} could not be found") tasks_to_set_state = [task for task in task_group.iter_tasks() if isinstance(task, BaseOperator)] task_ids = [task.task_id for task in task_group.iter_tasks()] dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id) -if start_date is None and end_date is None: -dag_runs_query = dag_runs_query.where(DagRun.execution_date == start_date) -else: -if start_date is not None: -dag_runs_query = dag_runs_query.where(DagRun.execution_date >= start_date) -if end_date is not None: -dag_runs_query = dag_runs_query.where(DagRun.execution_date <= end_date) + +@cache +def get_logical_date() -> datetime: +stmt = select(DagRun.logical_date).where(DagRun.run_id == run_id, DagRun.dag_id == self.dag_id) +return session.scalars(stmt).one() # Raises an error if not found + +end_date = get_logical_date() if not future else None +start_date = get_logical_date() if not past else None Review Comment: ```suggestion end_date = None if future else get_logical_date() start_date = None if past else get_logical_date() ``` ## airflow/cli/commands/task_command.py: ## @@ -112,49 +147,48 @@ def _get_dag_run( the logical date; otherwise use it as a run ID and set the logical date to the current time. """ -if not exec_date_or_run_id and not create_if_necessary: -raise ValueError("Must provide `exec_date_or_run_id` if not `create_if_necessary`.") -execution_date: pendulum.DateTime | None = None -if exec_date_or_run_id: -dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, run_id=exec_date_or_run_id, session=session) -if dag_run: -return dag_run, False -with suppress(ParserError, TypeError): -execution_date = timezone.parse(exec_date_or_run_id) -if execution_date: -dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, execution_date=execution_date, session=session) -if dag_run: +if not logical_date_or_run_id and not create_if_necessary: +raise ValueError("Must provide `logical_date_or_run_id` if not `create_if_necessary`.") + +logical_date = None +if logical_date_or_run_id: +dag_run, logical_date = _fetch_dag_run_from_run_id_or_logical_date_string( +dag_id=dag.dag_id, +value=logical_date_or_run_id, +session=session, +) +if dag_run is not None: return dag_run, False elif not create_if_necessary: raise DagRunNotFound( -f"DagRun for {dag.dag_id} with run_id or execution_date " -f"of {exec_date_or_run_id!r} not found" +f"DagRun for {dag.dag_id} with run_id or logical_date " +f"of {logical_date_or_run_id!r} not found" ) -if execution_date is not None: -dag_run_execution_date = execution_date +if logical_date is not None: +dag_run_logical_date = logical_date else: -dag_run_execution_date = pendulum.instance(timezone.utcnow()) +dag_run_logical_date = pendulum.instance(timezone.utcnow()) if create_if_necessary == "memory": dag_run = DagRun( dag_id=dag.dag_id, -run_id=exec_date_or_run_id, -execution_date=dag_run_execution_date, - data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date), +run_id=logical_date_or_run_id, +logical_date=dag_run_logical_date, + data_interval=dag.timetable.infer_manual_data_interval(run_after
Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]
Lee-W commented on code in PR #42404: URL: https://github.com/apache/airflow/pull/42404#discussion_r1832256503 ## airflow/api_connexion/schemas/dag_run_schema.py: ## @@ -109,9 +101,9 @@ def autogenerate(self, data, **kwargs): @post_dump def autofill(self, data, **kwargs): -"""Populate execution_date from logical_date for compatibility.""" +"""Populate logical_date from logical_date for compatibility.""" ret_data = {} -data["execution_date"] = data["logical_date"] +data["logical_date"] = data["logical_date"] Review Comment: Do we still need this line? ## airflow/api/common/mark_tasks.py: ## @@ -344,27 +318,15 @@ def set_dag_run_state_to_failed( Set for a specific execution date and its task instances to failed. :param dag: the DAG of which to alter state -:param execution_date: the execution date from which to start looking(deprecated) :param run_id: the DAG run_id to start looking from :param commit: commit DAG and tasks to be altered to the database :param session: database session :return: If commit is true, list of tasks that have been updated, otherwise list of tasks that will be updated -:raises: AssertionError if dag or execution_date is invalid +:raises: AssertionError if dag or logical_date is invalid Review Comment: Do we still need to raise something for `logical_date`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org