Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-20 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1848746052


##
airflow/api_connexion/schemas/task_instance_schema.py:
##
@@ -212,8 +210,8 @@ class SetTaskInstanceStateFormSchema(Schema):
 @validates_schema
 def validate_form(self, data, **kwargs):
 """Validate set task instance state form."""
-if not exactly_one(data.get("logical_date"), data.get("dag_run_id")):
-raise ValidationError("Exactly one of logical_date or dag_run_id 
must be provided")
+if not data.get("dag_run_id"):
+raise ValidationError("dag_run_id must be provided")

Review Comment:
   We don’t need this if we set `dag_run_id = fields.Str(required=True)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-19 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1848815726


##
airflow/api_connexion/schemas/task_instance_schema.py:
##
@@ -212,8 +210,8 @@ class SetTaskInstanceStateFormSchema(Schema):
 @validates_schema
 def validate_form(self, data, **kwargs):
 """Validate set task instance state form."""
-if not exactly_one(data.get("logical_date"), data.get("dag_run_id")):
-raise ValidationError("Exactly one of logical_date or dag_run_id 
must be provided")
+if not data.get("dag_run_id"):
+raise ValidationError("dag_run_id must be provided")

Review Comment:
   Thanks!. Changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-19 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1848530608


##
airflow/ti_deps/deps/exec_date_after_start_date_dep.py:
##
@@ -24,23 +24,23 @@
 class ExecDateAfterStartDateDep(BaseTIDep):

Review Comment:
   part of other PR



##
providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##
@@ -38,9 +38,14 @@
 from kubernetes.dynamic import DynamicClient
 from sqlalchemy import or_, select, update
 
+try:
+from airflow.cli.cli_config import ARG_LOGICAL_DATE
+except ImportError:  # 2.x compatibility.
+from airflow.cli.cli_config import (  # type: ignore[attr-defined, 
no-redef]
+ARG_EXECUTION_DATE as ARG_LOGICAL_DATE,
+)

Review Comment:
   Part of other PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-19 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1848529847


##
airflow/cli/commands/task_command.py:
##
@@ -112,49 +147,48 @@ def _get_dag_run(
the logical date; otherwise use it as a run ID and set the logical date
to the current time.
 """
-if not exec_date_or_run_id and not create_if_necessary:
-raise ValueError("Must provide `exec_date_or_run_id` if not 
`create_if_necessary`.")
-execution_date: pendulum.DateTime | None = None
-if exec_date_or_run_id:
-dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, 
run_id=exec_date_or_run_id, session=session)
-if dag_run:
-return dag_run, False
-with suppress(ParserError, TypeError):
-execution_date = timezone.parse(exec_date_or_run_id)
-if execution_date:
-dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, 
execution_date=execution_date, session=session)
-if dag_run:
+if not logical_date_or_run_id and not create_if_necessary:
+raise ValueError("Must provide `logical_date_or_run_id` if not 
`create_if_necessary`.")
+
+logical_date = None
+if logical_date_or_run_id:
+dag_run, logical_date = 
_fetch_dag_run_from_run_id_or_logical_date_string(
+dag_id=dag.dag_id,
+value=logical_date_or_run_id,
+session=session,
+)
+if dag_run is not None:
 return dag_run, False
 elif not create_if_necessary:
 raise DagRunNotFound(
-f"DagRun for {dag.dag_id} with run_id or execution_date "
-f"of {exec_date_or_run_id!r} not found"
+f"DagRun for {dag.dag_id} with run_id or logical_date "
+f"of {logical_date_or_run_id!r} not found"
 )
 
-if execution_date is not None:
-dag_run_execution_date = execution_date
+if logical_date is not None:
+dag_run_logical_date = logical_date
 else:
-dag_run_execution_date = pendulum.instance(timezone.utcnow())
+dag_run_logical_date = pendulum.instance(timezone.utcnow())
 
 if create_if_necessary == "memory":
 dag_run = DagRun(
 dag_id=dag.dag_id,
-run_id=exec_date_or_run_id,
-execution_date=dag_run_execution_date,
-
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+run_id=logical_date_or_run_id,
+logical_date=dag_run_logical_date,
+
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
 triggered_by=DagRunTriggeredByType.CLI,
 )
 return dag_run, True
 elif create_if_necessary == "db":
 dag_run = dag.create_dagrun(
 state=DagRunState.QUEUED,
-execution_date=dag_run_execution_date,
+logical_date=dag_run_logical_date,
 run_id=_generate_temporary_run_id(),
-
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
 session=session,
 triggered_by=DagRunTriggeredByType.CLI,
 )
-return dag_run, True
+return dag_run, True  # type: ignore[return-value]

Review Comment:
   removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-19 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1848528370


##
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##
@@ -522,19 +522,10 @@ def post_set_task_instances_state(*, dag_id: str, 
session: Session = NEW_SESSION
 if not task:
 error_message = f"Task ID {task_id} not found"
 raise NotFound(error_message)
-
-logical_date = data.get("logical_date")

Review Comment:
   removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-19 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1848077091


##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   Added it



##
airflow/www/views.py:
##
@@ -2144,7 +2146,7 @@ def trigger(self, dag_id: str, session: Session = 
NEW_SESSION):
 form=form,
 )
 
-dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, 
logical_date=logical_date)
+dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id)

Review Comment:
   Changed it



##
tests/sensors/test_external_task_sensor.py:
##
@@ -1373,9 +1374,17 @@ def 
test_external_task_marker_clear_activate(dag_bag_parent_child, session):
 run_tasks(dag_bag, logical_date=day_1)
 run_tasks(dag_bag, logical_date=day_2)
 
+from sqlalchemy import select
+
 # Assert that dagruns of all the affected dags are set to SUCCESS before 
tasks are cleared.
 for dag, logical_date in itertools.product(dag_bag.dags.values(), [day_1, 
day_2]):
-dagrun = dag.get_dagrun(logical_date=logical_date, session=session)
+dagrun = dag.get_dagrun(
+run_id=select(DagRun.run_id)
+.where(DagRun.logical_date == logical_date)
+.order_by(DagRun.id.desc())
+.limit(1),
+session=session,
+)

Review Comment:
   Done for all of them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616824


##
providers/tests/google/common/hooks/test_base_google.py:
##


Review Comment:
   Same, not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616575


##
providers/tests/google/cloud/operators/test_bigquery_dts.py:
##


Review Comment:
   Changes in this file are not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847630397


##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   yep, this one is the only place, we did not use `run_id=` and session, 
probably just missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847635422


##
tests/www/views/test_views.py:
##
@@ -396,8 +396,8 @@ def get_task_instance(session, task):
 # task_2 remains as SUCCESS
 assert get_task_instance(session, task_2).state == State.SUCCESS
 # task_3 and task_4 are cleared because they were in 
FAILED/UPSTREAM_FAILED state
-assert get_task_instance(session, task_3).state == State.NONE
-assert get_task_instance(session, task_4).state == State.NONE
+assert get_task_instance(session, task_3).state == 
State.UPSTREAM_FAILED
+assert get_task_instance(session, task_4).state == State.FAILED

Review Comment:
   These should not change, something is not right in the implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847631286


##
tests/models/test_dag.py:
##
@@ -2671,8 +2663,8 @@ def get_ti_from_db(task):
 # task_2 remains as SUCCESS
 assert get_ti_from_db(task_2).state == State.SUCCESS
 # task_3 and task_4 are cleared because they were in 
FAILED/UPSTREAM_FAILED state
-assert get_ti_from_db(task_3).state == State.NONE
-assert get_ti_from_db(task_4).state == State.NONE
+assert get_ti_from_db(task_3).state == State.UPSTREAM_FAILED

Review Comment:
   Yeah these states should not change; something is wrong here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847630397


##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   yep, this one is the only place, we did not use `run_id=`



##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   yep, this one is the only place, we did not use `run_id=`, probably just 
missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616218


##
airflow/www/views.py:
##
@@ -2144,7 +2146,7 @@ def trigger(self, dag_id: str, session: Session = 
NEW_SESSION):
 form=form,
 )
 
-dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, 
logical_date=logical_date)
+dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id)

Review Comment:
   Same, it’d be a good idea to pass session in here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847615440


##
airflow/models/slamiss.py:
##


Review Comment:
   This file should not be added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847616027


##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   We don’t _need_ to, but should. Similar to the `dag.get_dagrun` call below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847615530


##
airflow/settings.py:
##
@@ -831,6 +831,10 @@ def is_usage_data_collection_enabled() -> bool:
 
 ALLOW_FUTURE_LOGICAL_DATES = conf.getboolean("scheduler", 
"allow_trigger_in_future", fallback=False)
 
+ALLOW_TRIGGER_DAGRUN_IN_FUTURE = conf.getboolean("scheduler", 
"allow_trigger_in_future", fallback=False)
+
+ALLOW_TRIGGER_DAGRUN_IN_FUTURE = conf.getboolean("scheduler", 
"allow_trigger_in_future", fallback=False)
+

Review Comment:
   Rebase error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847608478


##
airflow/cli/commands/task_command.py:
##
@@ -91,19 +91,54 @@ def _generate_temporary_run_id() -> str:
 return f"__airflow_temporary_run_{timezone.utcnow().isoformat()}__"
 
 
+def _fetch_dag_run_from_run_id_or_logical_date_string(
+*,
+dag_id: str,
+value: str,

Review Comment:
   We use `logical_date_or_run_id` in many places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


uranusjr commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847608031


##
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##
@@ -522,19 +522,10 @@ def post_set_task_instances_state(*, dag_id: str, 
session: Session = NEW_SESSION
 if not task:
 error_message = f"Task ID {task_id} not found"
 raise NotFound(error_message)
-
-logical_date = data.get("logical_date")

Review Comment:
   `logical_date` should also be removed from `SetTaskInstanceStateFormSchema` 
(where this key came from) and the API spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847584414


##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   
   ```suggestion
   run_id=select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)
   ```



##
tests/models/test_dag.py:
##
@@ -2671,8 +2663,8 @@ def get_ti_from_db(task):
 # task_2 remains as SUCCESS
 assert get_ti_from_db(task_2).state == State.SUCCESS
 # task_3 and task_4 are cleared because they were in 
FAILED/UPSTREAM_FAILED state
-assert get_ti_from_db(task_3).state == State.NONE
-assert get_ti_from_db(task_4).state == State.NONE
+assert get_ti_from_db(task_3).state == State.UPSTREAM_FAILED

Review Comment:
   May I know why we're changing the expected values here?



##
airflow/www/views.py:
##
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
 
 logger.info("Retrieving rendered templates.")
 dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
-dag_run = dag.get_dagrun(logical_date=dttm)
+dag_run = dag.get_dagrun(
+select(DagRun.run_id).where(DagRun.logical_date == 
dttm).order_by(DagRun.id.desc()).limit(1)

Review Comment:
   do we need to pass session here?



##
tests/sensors/test_external_task_sensor.py:
##
@@ -1373,9 +1374,17 @@ def 
test_external_task_marker_clear_activate(dag_bag_parent_child, session):
 run_tasks(dag_bag, logical_date=day_1)
 run_tasks(dag_bag, logical_date=day_2)
 
+from sqlalchemy import select
+
 # Assert that dagruns of all the affected dags are set to SUCCESS before 
tasks are cleared.
 for dag, logical_date in itertools.product(dag_bag.dags.values(), [day_1, 
day_2]):
-dagrun = dag.get_dagrun(logical_date=logical_date, session=session)
+dagrun = dag.get_dagrun(
+run_id=select(DagRun.run_id)
+.where(DagRun.logical_date == logical_date)
+.order_by(DagRun.id.desc())
+.limit(1),
+session=session,
+)

Review Comment:
   ```suggestion
   run_id = (
   select(DagRun.run_id)
   .where(DagRun.logical_date == logical_date)
   .order_by(DagRun.id.desc())
   .limit(1)
   )
   dagrun = dag.get_dagrun(
   run_id=run_id,
   session=session,
   )
   ```
   
   split it into 2 variables might make it easier to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-18 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1846056467


##
airflow/models/dag.py:
##
@@ -1419,24 +1399,8 @@ def clear(
 session: Session = NEW_SESSION,
 dag_bag: DagBag | None = None,
 exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
+exclude_run_ids: frozenset[str] | None = frozenset(),
 ) -> int | Iterable[TaskInstance]:
-"""

Review Comment:
   Added it back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-11 Thread via GitHub


sunank200 commented on PR #42404:
URL: https://github.com/apache/airflow/pull/42404#issuecomment-2469302913

   The following PR: 
[https://github.com/apache/airflow/pull/43902](https://github.com/apache/airflow/pull/43902)
 renames the `execution_date` to `logical_date` across the codebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1836006380


##
airflow/models/dag.py:
##
@@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance):
 exit_stack.callback(lambda: secrets_backend_list.pop(0))
 
 with exit_stack:
-execution_date = execution_date or timezone.utcnow()
+logical_date = logical_date or timezone.utcnow()
 self.validate()
-self.log.debug("Clearing existing task instances for execution 
date %s", execution_date)
+self.log.debug("Clearing existing task instances for execution 
date %s", logical_date)

Review Comment:
   Sounds good 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835991045


##
airflow/www/views.py:
##
@@ -1654,10 +1666,10 @@ def get_logs_with_metadata(self, session: Session = 
NEW_SESSION):
 
 # Convert string datetime into actual datetime
 try:
-execution_date = timezone.parse(execution_date_str, strict=True)
+logical_date = timezone.parse(logical_date_str, strict=True)
 except ValueError:
 error_message = (
-f"Given execution date {execution_date_str!r} could not be 
identified as a date. "
+f"Given execution date {logical_date_str!r} could not be 
identified as a date. "

Review Comment:
   Changed it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835989800


##
airflow/models/slamiss.py:
##
@@ -14,25 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-

Review Comment:
   Thanks. This file has to be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835990646


##
airflow/www/views.py:
##
@@ -221,16 +221,16 @@ def get_safe_url(url):
 
 def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
 """Get Execution Data, Base Date & Number of runs from a Request."""
-date_time = www_request.args.get("execution_date")
+date_time = www_request.args.get("logical_date")
 run_id = www_request.args.get("run_id")
-# First check run id, then check execution date, if not fall back on the 
latest dagrun
+# First check run id, then check logical_date date, if not fall back on 
the latest dagrun

Review Comment:
   Changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835988764


##
airflow/models/log.py:
##
@@ -73,7 +73,7 @@ def __init__(
 if task_instance:
 self.dag_id = task_instance.dag_id
 self.task_id = task_instance.task_id
-if execution_date := getattr(task_instance, "execution_date", 
None):
+if execution_date := getattr(task_instance, "logical_date", None):
 self.execution_date = execution_date

Review Comment:
   Yes because in log we still use execution_date in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835988369


##
airflow/models/dag.py:
##
@@ -2439,24 +2403,24 @@ def _get_or_create_dagrun(
 :param dag: DAG to be used to find run.
 :param conf: Configuration to pass to newly created run.
 :param start_date: Start date of new run.
-:param execution_date: Logical date for finding an existing run.
+:param logical_date: Logical date for finding an existing run.

Review Comment:
   Sure. I will change them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835987661


##
airflow/models/dag.py:
##
@@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance):
 exit_stack.callback(lambda: secrets_backend_list.pop(0))
 
 with exit_stack:
-execution_date = execution_date or timezone.utcnow()
+logical_date = logical_date or timezone.utcnow()
 self.validate()
-self.log.debug("Clearing existing task instances for execution 
date %s", execution_date)
+self.log.debug("Clearing existing task instances for execution 
date %s", logical_date)

Review Comment:
   This can be changed in next PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835987493


##
airflow/models/dag.py:
##
@@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance):
 exit_stack.callback(lambda: secrets_backend_list.pop(0))
 
 with exit_stack:
-execution_date = execution_date or timezone.utcnow()
+logical_date = logical_date or timezone.utcnow()
 self.validate()
-self.log.debug("Clearing existing task instances for execution 
date %s", execution_date)
+self.log.debug("Clearing existing task instances for execution 
date %s", logical_date)

Review Comment:
   I have kept execution_date in log as of now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835986921


##
airflow/models/dag.py:
##
@@ -1337,42 +1322,35 @@ def set_task_group_state(
 """
 from airflow.api.common.mark_tasks import set_state
 
-if not exactly_one(execution_date, run_id):
-raise ValueError("Exactly one of execution_date or run_id must be 
provided")
-
 tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = []
 task_ids: list[str] = []
 
-if execution_date is None:
-dag_run = session.scalars(
-select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == 
self.dag_id)
-).one()  # Raises an error if not found
-resolve_execution_date = dag_run.execution_date
-else:
-resolve_execution_date = execution_date
-
-end_date = resolve_execution_date if not future else None
-start_date = resolve_execution_date if not past else None
-
 task_group_dict = self.task_group.get_task_group_dict()
 task_group = task_group_dict.get(group_id)
 if task_group is None:
 raise ValueError("TaskGroup {group_id} could not be found")
 tasks_to_set_state = [task for task in task_group.iter_tasks() if 
isinstance(task, BaseOperator)]
 task_ids = [task.task_id for task in task_group.iter_tasks()]
 dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id)
-if start_date is None and end_date is None:
-dag_runs_query = dag_runs_query.where(DagRun.execution_date == 
start_date)
-else:
-if start_date is not None:
-dag_runs_query = dag_runs_query.where(DagRun.execution_date >= 
start_date)
-if end_date is not None:
-dag_runs_query = dag_runs_query.where(DagRun.execution_date <= 
end_date)
+
+@cache
+def get_logical_date() -> datetime:
+stmt = select(DagRun.logical_date).where(DagRun.run_id == run_id, 
DagRun.dag_id == self.dag_id)
+return session.scalars(stmt).one()  # Raises an error if not found
+
+end_date = get_logical_date() if not future else None
+start_date = get_logical_date() if not past else None

Review Comment:
   Changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835985937


##
airflow/api_connexion/schemas/dag_run_schema.py:
##
@@ -109,9 +101,9 @@ def autogenerate(self, data, **kwargs):
 
 @post_dump
 def autofill(self, data, **kwargs):
-"""Populate execution_date from logical_date for compatibility."""
+"""Populate logical_date from logical_date for compatibility."""
 ret_data = {}
-data["execution_date"] = data["logical_date"]
+data["logical_date"] = data["logical_date"]

Review Comment:
   Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-10 Thread via GitHub


sunank200 commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1835985394


##
airflow/api/common/mark_tasks.py:
##
@@ -344,27 +318,15 @@ def set_dag_run_state_to_failed(
 Set for a specific execution date and its task instances to failed.
 
 :param dag: the DAG of which to alter state
-:param execution_date: the execution date from which to start 
looking(deprecated)
 :param run_id: the DAG run_id to start looking from
 :param commit: commit DAG and tasks to be altered to the database
 :param session: database session
 :return: If commit is true, list of tasks that have been updated,
  otherwise list of tasks that will be updated
-:raises: AssertionError if dag or execution_date is invalid
+:raises: AssertionError if dag or logical_date is invalid

Review Comment:
   No I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-07 Thread via GitHub


Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1832281554


##
airflow/models/dag.py:
##
@@ -1337,42 +1322,35 @@ def set_task_group_state(
 """
 from airflow.api.common.mark_tasks import set_state
 
-if not exactly_one(execution_date, run_id):
-raise ValueError("Exactly one of execution_date or run_id must be 
provided")
-
 tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = []
 task_ids: list[str] = []
 
-if execution_date is None:
-dag_run = session.scalars(
-select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == 
self.dag_id)
-).one()  # Raises an error if not found
-resolve_execution_date = dag_run.execution_date
-else:
-resolve_execution_date = execution_date
-
-end_date = resolve_execution_date if not future else None
-start_date = resolve_execution_date if not past else None
-
 task_group_dict = self.task_group.get_task_group_dict()
 task_group = task_group_dict.get(group_id)
 if task_group is None:
 raise ValueError("TaskGroup {group_id} could not be found")
 tasks_to_set_state = [task for task in task_group.iter_tasks() if 
isinstance(task, BaseOperator)]
 task_ids = [task.task_id for task in task_group.iter_tasks()]
 dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id)
-if start_date is None and end_date is None:
-dag_runs_query = dag_runs_query.where(DagRun.execution_date == 
start_date)
-else:
-if start_date is not None:
-dag_runs_query = dag_runs_query.where(DagRun.execution_date >= 
start_date)
-if end_date is not None:
-dag_runs_query = dag_runs_query.where(DagRun.execution_date <= 
end_date)
+
+@cache
+def get_logical_date() -> datetime:
+stmt = select(DagRun.logical_date).where(DagRun.run_id == run_id, 
DagRun.dag_id == self.dag_id)
+return session.scalars(stmt).one()  # Raises an error if not found
+
+end_date = get_logical_date() if not future else None
+start_date = get_logical_date() if not past else None

Review Comment:
   ```suggestion
   end_date = None if future else get_logical_date()
   start_date = None if past else get_logical_date()
   ```



##
airflow/cli/commands/task_command.py:
##
@@ -112,49 +147,48 @@ def _get_dag_run(
the logical date; otherwise use it as a run ID and set the logical date
to the current time.
 """
-if not exec_date_or_run_id and not create_if_necessary:
-raise ValueError("Must provide `exec_date_or_run_id` if not 
`create_if_necessary`.")
-execution_date: pendulum.DateTime | None = None
-if exec_date_or_run_id:
-dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, 
run_id=exec_date_or_run_id, session=session)
-if dag_run:
-return dag_run, False
-with suppress(ParserError, TypeError):
-execution_date = timezone.parse(exec_date_or_run_id)
-if execution_date:
-dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, 
execution_date=execution_date, session=session)
-if dag_run:
+if not logical_date_or_run_id and not create_if_necessary:
+raise ValueError("Must provide `logical_date_or_run_id` if not 
`create_if_necessary`.")
+
+logical_date = None
+if logical_date_or_run_id:
+dag_run, logical_date = 
_fetch_dag_run_from_run_id_or_logical_date_string(
+dag_id=dag.dag_id,
+value=logical_date_or_run_id,
+session=session,
+)
+if dag_run is not None:
 return dag_run, False
 elif not create_if_necessary:
 raise DagRunNotFound(
-f"DagRun for {dag.dag_id} with run_id or execution_date "
-f"of {exec_date_or_run_id!r} not found"
+f"DagRun for {dag.dag_id} with run_id or logical_date "
+f"of {logical_date_or_run_id!r} not found"
 )
 
-if execution_date is not None:
-dag_run_execution_date = execution_date
+if logical_date is not None:
+dag_run_logical_date = logical_date
 else:
-dag_run_execution_date = pendulum.instance(timezone.utcnow())
+dag_run_logical_date = pendulum.instance(timezone.utcnow())
 
 if create_if_necessary == "memory":
 dag_run = DagRun(
 dag_id=dag.dag_id,
-run_id=exec_date_or_run_id,
-execution_date=dag_run_execution_date,
-
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+run_id=logical_date_or_run_id,
+logical_date=dag_run_logical_date,
+
data_interval=dag.timetable.infer_manual_data_interval(run_after

Re: [PR] Remove execution_date and logical_date from DAG Run APIs and Functions, transition to run_id as sole identifier for Airflow 3.0 [airflow]

2024-11-07 Thread via GitHub


Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1832256503


##
airflow/api_connexion/schemas/dag_run_schema.py:
##
@@ -109,9 +101,9 @@ def autogenerate(self, data, **kwargs):
 
 @post_dump
 def autofill(self, data, **kwargs):
-"""Populate execution_date from logical_date for compatibility."""
+"""Populate logical_date from logical_date for compatibility."""
 ret_data = {}
-data["execution_date"] = data["logical_date"]
+data["logical_date"] = data["logical_date"]

Review Comment:
   Do we still need this line?



##
airflow/api/common/mark_tasks.py:
##
@@ -344,27 +318,15 @@ def set_dag_run_state_to_failed(
 Set for a specific execution date and its task instances to failed.
 
 :param dag: the DAG of which to alter state
-:param execution_date: the execution date from which to start 
looking(deprecated)
 :param run_id: the DAG run_id to start looking from
 :param commit: commit DAG and tasks to be altered to the database
 :param session: database session
 :return: If commit is true, list of tasks that have been updated,
  otherwise list of tasks that will be updated
-:raises: AssertionError if dag or execution_date is invalid
+:raises: AssertionError if dag or logical_date is invalid

Review Comment:
   Do we still need to raise something for `logical_date`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org