Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
pierrejeambrun merged PR #42975: URL: https://github.com/apache/airflow/pull/42975 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1842399041 ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -142,3 +147,49 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +def clear_dag_run( +dag_id: str, +dag_run_id: str, +body: DAGRunClearBody, +request: Request, +session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: +dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) +if dag_run is None: +raise HTTPException( +404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" +) + +dag: DAG = request.app.state.dag_bag.get_dag(dag_id) +start_date = dag_run.logical_date +end_date = dag_run.logical_date + +if body.dry_run: +task_instances = dag.clear( +start_date=start_date, +end_date=end_date, +task_ids=None, +only_failed=False, +dry_run=True, +session=session, +) + +return TaskInstanceCollectionResponse( +task_instances=[ +TaskInstanceResponse.model_validate(ti, from_attributes=True) for ti in task_instances +], +total_entries=len(task_instances), +) +else: +dag.clear( +start_date=dag_run.start_date, +end_date=dag_run.end_date, +task_ids=None, +only_failed=False, +session=session, +) +dag_run_cleared = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) Review Comment: Done ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -142,3 +147,49 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
pierrejeambrun commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1842355598 ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -142,3 +147,49 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) Review Comment: ```suggestion @dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([404])) ``` Convention changed here https://github.com/apache/airflow/pull/43932. 401, 403 are now directly inherited from the base router. ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -142,3 +147,49 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +def clear_dag_run( +dag_id: str, +dag_run_id: str, +body: DAGRunClearBody, +request: Request, +session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: +dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) +if dag_run is None: +raise HTTPException( +404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" +) + +dag: DAG = request.app.state.dag_bag.get_dag(dag_id) +start_date = dag_run.logical_date +end_date = dag_run.logical_date + +if body.dry_run: +task_instances = dag.clear( +start_date=start_date, +end_date=end_date, +task_ids=None, +only_failed=False, +dry_run=True, +session=session, +) + +return TaskInstanceCollectionResponse( +task_instances=[ +TaskInstanceResponse.model_validate(ti, from_attributes=True) for ti in task_instances +], +total_entries=len(task_instances), +) +else: +dag.clear( +start_date=dag_run.start_date, +end_date=dag_run.end_date, +task_ids=None, +only_failed=False, +session=session, +) +dag_run_cleared = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) Review Comment: ```suggestion dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id==dag_run.id)) ``` Might be easier ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1841698813 ## tests/api_fastapi/core_api/routes/public/test_dag_run.py: ## @@ -254,3 +254,40 @@ def test_delete_dag_run_not_found(self, test_client): assert response.status_code == 404 body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + +class TestClearDagRun: +def test_clear_dag_run(self, test_client): +response = test_client.post( +f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json={"dry_run": False} +) +assert response.status_code == 200 +body = response.json() +assert body["dag_id"] == DAG1_ID +assert body["run_id"] == DAG1_RUN1_ID +assert body["state"] == "queued" + +@pytest.mark.parametrize( +"body", +[{"dry_run": True}, {}], +) +def test_clear_dag_run_dry_run(self, test_client, body): +response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body) +assert response.status_code == 200 +body = response.json() +assert body["total_entries"] == 1 +for each in body["task_instances"]: +assert each["state"] is None Review Comment: ~~Yeah. You are right. For some reason, dag.clear is returning task instance with None state. I'm looking into it.~~ Actually, ti state isn't being set when creating these dag runs. I will update the setup to also create task instances and set their state(This is being done within the legacy tests). assert them in the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1841698813 ## tests/api_fastapi/core_api/routes/public/test_dag_run.py: ## @@ -254,3 +254,40 @@ def test_delete_dag_run_not_found(self, test_client): assert response.status_code == 404 body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + +class TestClearDagRun: +def test_clear_dag_run(self, test_client): +response = test_client.post( +f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json={"dry_run": False} +) +assert response.status_code == 200 +body = response.json() +assert body["dag_id"] == DAG1_ID +assert body["run_id"] == DAG1_RUN1_ID +assert body["state"] == "queued" + +@pytest.mark.parametrize( +"body", +[{"dry_run": True}, {}], +) +def test_clear_dag_run_dry_run(self, test_client, body): +response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body) +assert response.status_code == 200 +body = response.json() +assert body["total_entries"] == 1 +for each in body["task_instances"]: +assert each["state"] is None Review Comment: Yeah. You are right. For some reason, dag.clear is returning task instance with None state. I'm looking into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1841663179 ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -141,3 +146,48 @@ async def patch_dag_run_state( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +async def clear_dag_run( +dag_id: str, +dag_run_id: str, +patch_body: DAGRunClearBody, +request: Request, +session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: +dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) +if dag_run is None: +raise HTTPException( +404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" +) + +dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + +if patch_body.dry_run: +task_instances = dag.clear( +start_date=dag_run.logical_date, +end_date=dag_run.logical_date, +task_ids=None, +only_failed=False, +dry_run=True, +session=session, +) + +return TaskInstanceCollectionResponse( +task_instances=[ +TaskInstanceResponse.model_validate(ti, from_attributes=True) +for ti in task_instances # type: ignore[union-attr] Review Comment: I went with the second idea. Also made the return to be list[TaskInstance] instead of Iterable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
pierrejeambrun commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1838423844 ## tests/api_fastapi/core_api/routes/public/test_dag_run.py: ## @@ -254,3 +254,40 @@ def test_delete_dag_run_not_found(self, test_client): assert response.status_code == 404 body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + +class TestClearDagRun: +def test_clear_dag_run(self, test_client): +response = test_client.post( +f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json={"dry_run": False} +) +assert response.status_code == 200 +body = response.json() +assert body["dag_id"] == DAG1_ID +assert body["run_id"] == DAG1_RUN1_ID +assert body["state"] == "queued" + +@pytest.mark.parametrize( +"body", +[{"dry_run": True}, {}], +) +def test_clear_dag_run_dry_run(self, test_client, body): +response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body) +assert response.status_code == 200 +body = response.json() +assert body["total_entries"] == 1 +for each in body["task_instances"]: +assert each["state"] is None Review Comment: Shouldn't the state still be `success` of the TI ? Before cleaning it's in success and after the dry_run it remains in success ? ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -141,3 +146,48 @@ async def patch_dag_run_state( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +async def clear_dag_run( +dag_id: str, +dag_run_id: str, +patch_body: DAGRunClearBody, +request: Request, +session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: +dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) +if dag_run is None: +raise HTTPException( +404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" +) + +dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + +if patch_body.dry_run: +task_instances = dag.clear( +start_date=dag_run.logical_date, +end_date=dag_run.logical_date, +task_ids=None, +only_failed=False, +dry_run=True, +session=session, +) + +return TaskInstanceCollectionResponse( +task_instances=[ +TaskInstanceResponse.model_validate(ti, from_attributes=True) +for ti in task_instances # type: ignore[union-attr] Review Comment: Two solutions here: - When we give `dry_run=True` to the function, we know for sure that the return type is a `Iterable[TaskInstance]`. Then you can safely manually cast it here with `typing.cast` so mypy does not complain. - We can use `@overload` typing to precise the signature of the function -> When dry_run=True return type is `Iterable[TaskInstance]`, and when `dry_run=False` return type is `int`. Second solution is better I think because the rest of the code base can re-use it and avoid manual type casting if they encounter the same situation as here. ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -147,3 +152,48 @@ def patch_dag_run( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +def clear_dag_run( +dag_id: str, +dag_run_id: str, +patch_body: DAGRunClearBody, Review Comment: The route is a post, so `post_body` ? but actually just `body` is a better name I think. I started doing the `patch_body` but I don't think it's great. ```suggestion body: DAGRunClearBody, ``` ## airflow/api_fastapi/core_api/datamodels/dag_run.py: ## @@ -41,6 +41,12 @@ class DAGRunPatchBody(BaseModel): note: str | None = Field(None, max_length=1000) +class DAGRunClearBody(BaseModel): +"""DAG Run serializer for clear endpoint body.""" + +dry_run: bool | None = True Review Comment: Can never be `None` I believe. Also in the legacy the default value is `False` -> `dry_run = post_body.get("dry_run", False)` I'm not sure we want to change that. ```suggestion dry_run: bool = True ``` ## tests/api_fastapi/core_api/routes/public/test_dag_run.py: ## @@ -254,3 +254,40 @@ def test_delete_dag_run_not_found(self, test_client): assert response.status_code == 404 body = response.jso
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on PR #42975: URL: https://github.com/apache/airflow/pull/42975#issuecomment-2464591555 > I think we can do the same and return a partial response. There is a way to do that in fastapi specifying the response model. And we need to document it in the swagger with example responses. > I went with returning the entire object. Is that fine? If not, I can return the partial response. I already looked into how to do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1833640939 ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -141,3 +146,48 @@ async def patch_dag_run_state( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +async def clear_dag_run( +dag_id: str, +dag_run_id: str, +patch_body: DAGRunClearBody, +request: Request, +session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: +dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) +if dag_run is None: +raise HTTPException( +404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" +) + +dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + +if patch_body.dry_run: +task_instances = dag.clear( +start_date=dag_run.logical_date, +end_date=dag_run.logical_date, +task_ids=None, +only_failed=False, +dry_run=True, +session=session, +) + +return TaskInstanceCollectionResponse( +task_instances=[ +TaskInstanceResponse.model_validate(ti, from_attributes=True) +for ti in task_instances # type: ignore[union-attr] Review Comment: I had to include this type ignore because, mypy was complaining about task_instances. `dag.clear` has return type `int | Iterable[TaskInstance]`. `int`'s causing the problem here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on code in PR #42975: URL: https://github.com/apache/airflow/pull/42975#discussion_r1833640939 ## airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -141,3 +146,48 @@ async def patch_dag_run_state( dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404])) +async def clear_dag_run( +dag_id: str, +dag_run_id: str, +patch_body: DAGRunClearBody, +request: Request, +session: Annotated[Session, Depends(get_session)], +) -> TaskInstanceCollectionResponse | DAGRunResponse: +dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) +if dag_run is None: +raise HTTPException( +404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" +) + +dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + +if patch_body.dry_run: +task_instances = dag.clear( +start_date=dag_run.logical_date, +end_date=dag_run.logical_date, +task_ids=None, +only_failed=False, +dry_run=True, +session=session, +) + +return TaskInstanceCollectionResponse( +task_instances=[ +TaskInstanceResponse.model_validate(ti, from_attributes=True) +for ti in task_instances # type: ignore[union-attr] Review Comment: I had to include this type ignore because, mypy was complaining about task_instances. `dag.clear` has return type `int | Iterable[TaskInstance]`. Here's `int`'s causing the problem here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
pierrejeambrun commented on PR #42975: URL: https://github.com/apache/airflow/pull/42975#issuecomment-2457134128 I think we can do the same and return a partial response. There is a way to do that in fastapi specifying the response model. And we need to document it in the swagger. DetachedInstance error is most certainly due to a bad session handling. (session used to fetch objects is closed too early or something like this) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar commented on PR #42975: URL: https://github.com/apache/airflow/pull/42975#issuecomment-2453441589 @pierrejeambrun , I noticed that in legacy implementation, for clear dag run endpoint with dry_run=True, the response is supposed to be a TaskInstanceCollection. However, the response only includes few attributes of TI. Below is an example response : ``` { "task_instances": [ { "dag_id": "example_astronauts", "dag_run_id": "manual__2024-11-03T14:05:08.832062+00:00", "execution_date": "2024-11-03T14:05:08.832062+00:00", "task_id": "print_astronaut_craft" },{ "dag_id": "example_astronauts", "dag_run_id": "manual__2024-11-03T14:05:08.832062+00:00", "execution_date": "2024-11-03T14:05:08.832062+00:00", "task_id": "get_astronauts" } ``` Should I keep the same? Or should I return all the details? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84 Migrate Clear Dag Run public endpoint to FastAPI [airflow]
rawwar closed pull request #42975: AIP-84 Migrate Clear Dag Run public endpoint to FastAPI URL: https://github.com/apache/airflow/pull/42975 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org