Re: [PR] Implement backfill for partitioned Dags [airflow]

2026-02-23 Thread via GitHub


Lee-W merged PR #61464:
URL: https://github.com/apache/airflow/pull/61464


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned Dags [airflow]

2026-02-23 Thread via GitHub


Lee-W commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2844982324


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -218,17 +219,29 @@ def validate_sort_ordinal(self, key: str, val: int) -> 
int:
 return val
 
 
-def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: 
Session):
+def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo):
 from airflow.models import DagRun
 
+if info.partition_key:
+return (
+select(DagRun)
+.where(
+DagRun.dag_id == dag_id,
+DagRun.partition_key == info.partition_key,
+)
+.order_by(
+DagRun.start_date.is_(None),
+DagRun.start_date.desc(),
+)
+.limit(1)
+)
 return (
 select(DagRun)
 .where(
-DagRun.logical_date == info.logical_date,
 DagRun.dag_id == dag_id,
+DagRun.logical_date == info.logical_date,
 )
-.order_by(nulls_first(DagRun.start_date.desc(), session=session))
-.limit(1)
+.limit(1)  # not really necessary since uniqueness constraint, but hey

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned Dags [airflow]

2026-02-23 Thread via GitHub


uranusjr commented on PR #61464:
URL: https://github.com/apache/airflow/pull/61464#issuecomment-3948918874

   Should be good except the one place mentioned above


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned Dags [airflow]

2026-02-23 Thread via GitHub


uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2844434989


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -218,17 +219,29 @@ def validate_sort_ordinal(self, key: str, val: int) -> 
int:
 return val
 
 
-def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: 
Session):
+def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo):
 from airflow.models import DagRun
 
+if info.partition_key:
+return (
+select(DagRun)
+.where(
+DagRun.dag_id == dag_id,
+DagRun.partition_key == info.partition_key,
+)
+.order_by(
+DagRun.start_date.is_(None),
+DagRun.start_date.desc(),
+)
+.limit(1)
+)
 return (
 select(DagRun)
 .where(
-DagRun.logical_date == info.logical_date,
 DagRun.dag_id == dag_id,
+DagRun.logical_date == info.logical_date,
 )
-.order_by(nulls_first(DagRun.start_date.desc(), session=session))
-.limit(1)
+.limit(1)  # not really necessary since uniqueness constraint, but hey

Review Comment:
   Probably simpler to rewrite this entire function to
   
   ```python
   stmt = select(DagRun).where(DagRun.dag_id == dag_id)
   if info.partition_key is not None:
   stmt = stmt.where(DagRun.partition_key == info.partition_key)
   if info.logical_date is not None:
   stmt = stmt.where(DagRun.logical_date == info.logical_date)
   stmt = stmt.order_by(DagRun.start_date.is_(None), DagRun.start_date.desc())
   return stmt.limit(1)  # not really necessary since uniqueness constraint, 
but hey
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned Dags [airflow]

2026-02-23 Thread via GitHub


uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2844425223


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -218,17 +219,29 @@ def validate_sort_ordinal(self, key: str, val: int) -> 
int:
 return val
 
 
-def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: 
Session):
+def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo):
 from airflow.models import DagRun
 
+if info.partition_key:
+return (
+select(DagRun)
+.where(
+DagRun.dag_id == dag_id,
+DagRun.partition_key == info.partition_key,
+)
+.order_by(
+DagRun.start_date.is_(None),
+DagRun.start_date.desc(),
+)
+.limit(1)
+)
 return (
 select(DagRun)
 .where(
-DagRun.logical_date == info.logical_date,
 DagRun.dag_id == dag_id,
+DagRun.logical_date == info.logical_date,
 )
-.order_by(nulls_first(DagRun.start_date.desc(), session=session))
-.limit(1)
+.limit(1)  # not really necessary since uniqueness constraint, but hey

Review Comment:
   I think this query also needs to deal with start_date == None cases in 
ordering



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-23 Thread via GitHub


Lee-W commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839857654


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -221,14 +222,23 @@ def validate_sort_ordinal(self, key: str, val: int) -> 
int:
 def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: 
Session):
 from airflow.models import DagRun
 
+if info.partition_key:
+return (
+select(DagRun)
+.where(
+DagRun.dag_id == dag_id,
+DagRun.partition_key == info.partition_key,
+)
+.order_by(nulls_first(DagRun.start_date.desc(), session=session))

Review Comment:
   Yep, just fixed it and add a unit test to catch it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-23 Thread via GitHub


Lee-W commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839584418


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
 session.add(br)
 session.commit()
 
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")
+
 dagrun_info_list = _get_info_list(
 from_date=from_date,
 to_date=to_date,
 reverse=reverse,
 dag=dag,
 )
+if not dagrun_info_list:
+raise RuntimeError(f"No runs to create for Dag {dag_id}")

Review Comment:
   Ideally, this should not happen. It should be blocked from the API or even 
the UI layer.
   
   >  Why does the function need to return a list if we only need the first? 
(The previous code uses all the infos returned here)
   
   We actally pass the list to the following functions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-23 Thread via GitHub


Lee-W commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839548270


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
 session.add(br)
 session.commit()
 
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")

Review Comment:
   Yep, I think so. Updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-23 Thread via GitHub


uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839515840


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
 session.add(br)
 session.commit()
 
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")
+
 dagrun_info_list = _get_info_list(
 from_date=from_date,
 to_date=to_date,
 reverse=reverse,
 dag=dag,
 )
+if not dagrun_info_list:
+raise RuntimeError(f"No runs to create for Dag {dag_id}")

Review Comment:
   Is this supposed to never happen? Why does the function need to return a 
list if we only need the first? (The previous code uses all the infos returned 
here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-23 Thread via GitHub


uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839510928


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
 session.add(br)
 session.commit()
 
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")

Review Comment:
   Is this supposed to never happen? Maybe change this to 
`session.scalars(...).one()` instead and let SQLAlchemy do its thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-22 Thread via GitHub


uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839488607


##
airflow-core/src/airflow/models/backfill.py:
##
@@ -221,14 +222,23 @@ def validate_sort_ordinal(self, key: str, val: int) -> 
int:
 def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: 
Session):
 from airflow.models import DagRun
 
+if info.partition_key:
+return (
+select(DagRun)
+.where(
+DagRun.dag_id == dag_id,
+DagRun.partition_key == info.partition_key,
+)
+.order_by(nulls_first(DagRun.start_date.desc(), session=session))

Review Comment:
   I just realised we probably can’t do this. (the logical_date variant was 
first introduced in #45062 fwict). This would put dag runs with null start_date 
first on Postgres, but on MySQL and SQLite, _null start_date rows will come 
last_. I’m not sure if we want to do this one way or the other, but we should 
do something to make the two return the same result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implement backfill for partitioned dags [airflow]

2026-02-22 Thread via GitHub


Lee-W commented on PR #61464:
URL: https://github.com/apache/airflow/pull/61464#issuecomment-3942495364

   overall looks good to me. tested locally. I'll add some unit tests and merge 
it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



[PR] Implement backfill for partitioned dags [airflow]

2026-02-04 Thread via GitHub


dstandish opened a new pull request, #61464:
URL: https://github.com/apache/airflow/pull/61464

   Currently we have the same "reuse old dag runs" behavior when rerunning over 
existing dates.  I would like to change that.
   
   Also, I need to update the migration so that it deletes the null values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]