Re: [PR] Implement backfill for partitioned Dags [airflow]
Lee-W merged PR #61464: URL: https://github.com/apache/airflow/pull/61464 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned Dags [airflow]
Lee-W commented on code in PR #61464: URL: https://github.com/apache/airflow/pull/61464#discussion_r2844982324 ## airflow-core/src/airflow/models/backfill.py: ## @@ -218,17 +219,29 @@ def validate_sort_ordinal(self, key: str, val: int) -> int: return val -def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: Session): +def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo): from airflow.models import DagRun +if info.partition_key: +return ( +select(DagRun) +.where( +DagRun.dag_id == dag_id, +DagRun.partition_key == info.partition_key, +) +.order_by( +DagRun.start_date.is_(None), +DagRun.start_date.desc(), +) +.limit(1) +) return ( select(DagRun) .where( -DagRun.logical_date == info.logical_date, DagRun.dag_id == dag_id, +DagRun.logical_date == info.logical_date, ) -.order_by(nulls_first(DagRun.start_date.desc(), session=session)) -.limit(1) +.limit(1) # not really necessary since uniqueness constraint, but hey Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned Dags [airflow]
uranusjr commented on PR #61464: URL: https://github.com/apache/airflow/pull/61464#issuecomment-3948918874 Should be good except the one place mentioned above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned Dags [airflow]
uranusjr commented on code in PR #61464: URL: https://github.com/apache/airflow/pull/61464#discussion_r2844434989 ## airflow-core/src/airflow/models/backfill.py: ## @@ -218,17 +219,29 @@ def validate_sort_ordinal(self, key: str, val: int) -> int: return val -def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: Session): +def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo): from airflow.models import DagRun +if info.partition_key: +return ( +select(DagRun) +.where( +DagRun.dag_id == dag_id, +DagRun.partition_key == info.partition_key, +) +.order_by( +DagRun.start_date.is_(None), +DagRun.start_date.desc(), +) +.limit(1) +) return ( select(DagRun) .where( -DagRun.logical_date == info.logical_date, DagRun.dag_id == dag_id, +DagRun.logical_date == info.logical_date, ) -.order_by(nulls_first(DagRun.start_date.desc(), session=session)) -.limit(1) +.limit(1) # not really necessary since uniqueness constraint, but hey Review Comment: Probably simpler to rewrite this entire function to ```python stmt = select(DagRun).where(DagRun.dag_id == dag_id) if info.partition_key is not None: stmt = stmt.where(DagRun.partition_key == info.partition_key) if info.logical_date is not None: stmt = stmt.where(DagRun.logical_date == info.logical_date) stmt = stmt.order_by(DagRun.start_date.is_(None), DagRun.start_date.desc()) return stmt.limit(1) # not really necessary since uniqueness constraint, but hey ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned Dags [airflow]
uranusjr commented on code in PR #61464: URL: https://github.com/apache/airflow/pull/61464#discussion_r2844425223 ## airflow-core/src/airflow/models/backfill.py: ## @@ -218,17 +219,29 @@ def validate_sort_ordinal(self, key: str, val: int) -> int: return val -def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: Session): +def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo): from airflow.models import DagRun +if info.partition_key: +return ( +select(DagRun) +.where( +DagRun.dag_id == dag_id, +DagRun.partition_key == info.partition_key, +) +.order_by( +DagRun.start_date.is_(None), +DagRun.start_date.desc(), +) +.limit(1) +) return ( select(DagRun) .where( -DagRun.logical_date == info.logical_date, DagRun.dag_id == dag_id, +DagRun.logical_date == info.logical_date, ) -.order_by(nulls_first(DagRun.start_date.desc(), session=session)) -.limit(1) +.limit(1) # not really necessary since uniqueness constraint, but hey Review Comment: I think this query also needs to deal with start_date == None cases in ordering -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
Lee-W commented on code in PR #61464: URL: https://github.com/apache/airflow/pull/61464#discussion_r2839857654 ## airflow-core/src/airflow/models/backfill.py: ## @@ -221,14 +222,23 @@ def validate_sort_ordinal(self, key: str, val: int) -> int: def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: Session): from airflow.models import DagRun +if info.partition_key: +return ( +select(DagRun) +.where( +DagRun.dag_id == dag_id, +DagRun.partition_key == info.partition_key, +) +.order_by(nulls_first(DagRun.start_date.desc(), session=session)) Review Comment: Yep, just fixed it and add a unit test to catch it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
Lee-W commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839584418
##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
session.add(br)
session.commit()
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")
+
dagrun_info_list = _get_info_list(
from_date=from_date,
to_date=to_date,
reverse=reverse,
dag=dag,
)
+if not dagrun_info_list:
+raise RuntimeError(f"No runs to create for Dag {dag_id}")
Review Comment:
Ideally, this should not happen. It should be blocked from the API or even
the UI layer.
> Why does the function need to return a list if we only need the first?
(The previous code uses all the infos returned here)
We actally pass the list to the following functions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
Lee-W commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839548270
##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
session.add(br)
session.commit()
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")
Review Comment:
Yep, I think so. Updated it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839515840
##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
session.add(br)
session.commit()
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")
+
dagrun_info_list = _get_info_list(
from_date=from_date,
to_date=to_date,
reverse=reverse,
dag=dag,
)
+if not dagrun_info_list:
+raise RuntimeError(f"No runs to create for Dag {dag_id}")
Review Comment:
Is this supposed to never happen? Why does the function need to return a
list if we only need the first? (The previous code uses all the infos returned
here)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
uranusjr commented on code in PR #61464:
URL: https://github.com/apache/airflow/pull/61464#discussion_r2839510928
##
airflow-core/src/airflow/models/backfill.py:
##
@@ -542,34 +615,94 @@ def _create_backfill(
session.add(br)
session.commit()
+dag_model = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))
+if not dag_model:
+raise RuntimeError(f"Dag {dag_id} not found")
Review Comment:
Is this supposed to never happen? Maybe change this to
`session.scalars(...).one()` instead and let SQLAlchemy do its thing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
uranusjr commented on code in PR #61464: URL: https://github.com/apache/airflow/pull/61464#discussion_r2839488607 ## airflow-core/src/airflow/models/backfill.py: ## @@ -221,14 +222,23 @@ def validate_sort_ordinal(self, key: str, val: int) -> int: def _get_latest_dag_run_row_query(*, dag_id: str, info: DagRunInfo, session: Session): from airflow.models import DagRun +if info.partition_key: +return ( +select(DagRun) +.where( +DagRun.dag_id == dag_id, +DagRun.partition_key == info.partition_key, +) +.order_by(nulls_first(DagRun.start_date.desc(), session=session)) Review Comment: I just realised we probably can’t do this. (the logical_date variant was first introduced in #45062 fwict). This would put dag runs with null start_date first on Postgres, but on MySQL and SQLite, _null start_date rows will come last_. I’m not sure if we want to do this one way or the other, but we should do something to make the two return the same result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implement backfill for partitioned dags [airflow]
Lee-W commented on PR #61464: URL: https://github.com/apache/airflow/pull/61464#issuecomment-3942495364 overall looks good to me. tested locally. I'll add some unit tests and merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
[PR] Implement backfill for partitioned dags [airflow]
dstandish opened a new pull request, #61464: URL: https://github.com/apache/airflow/pull/61464 Currently we have the same "reuse old dag runs" behavior when rerunning over existing dates. I would like to change that. Also, I need to update the migration so that it deletes the null values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
