Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
potiuk commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864885355 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: Wait... It's not human :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
potiuk commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864885260 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: Indeed. Almost un-human with this check 😱 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864881282 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: Copilot is getting smarter - nice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
potiuk commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864878987 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: AHA.. Cool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl merged PR #44433: URL: https://github.com/apache/airflow/pull/44433 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864801279 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: @potiuk I also first thought so... first time... then reading again... I really had a copy&past error being made and the description was really for a different case. But here the worker reports the queues and capacity he supports which will be a filter criteria for the service to select the next job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
potiuk commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864574326 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: That's a strange suggestion copilot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
Copilot commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864570662 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select, update + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +SessionDep, +create_openapi_http_exception_doc, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", Review Comment: The description for the `body` parameter in the `fetch` function is incorrect. It should describe the purpose of `WorkerQueuesBody`. ```suggestion description="The queues from which the worker can fetch jobs.", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on PR #44433: URL: https://github.com/apache/airflow/pull/44433#issuecomment-2509403396 > I will wait for 1 and 2 to be merged :) You just merged 2 after I merged 1... so "ball is in your field" now :-D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864437361 ## providers/src/airflow/providers/edge/worker_api/datamodels.py: ## @@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase): ) -class WorkerStateBody(BaseModel): +class EdgeJobBase(BaseModel): +"""Basic attributes of a job on the edge worker.""" + +dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to which the task belongs.") +task_id: str = Field(title="Task ID", description="Task name in the DAG.") +run_id: str = Field(title="Run ID", description="Run ID of the DAG execution.") +map_index: int = Field( +title="Map Index", +description="For dynamically mapped tasks the mapping number, -1 if the task is not mapped.", +) +try_number: int = Field(title="Try Number", description="The number of attempt to execute this task.") + +@property +def key(self) -> TaskInstanceKey: +return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index) + + +class EdgeJobFetched(EdgeJobBase): +"""Job that is to be executed on the edge worker.""" + +command: list[str] = Field(title="Command", description="Command line to use to execute the job.") +concurrency_slots: int = Field(description="Number of concurrency slots the job requires.") + + +class WorkerQueuesBase(BaseModel): +"""Queues that a worker supports to run jobs on.""" + +queues: Optional[List[str]] = Field( # noqa: UP006, UP007 - prevent pytest failing in back-compat Review Comment: Okay, pushed the commit to clean this up and seems the upgrade of pydantic fixed it. Added this in the last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864424751 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select +from sqlalchemy.orm import Session # noqa: TCH002 + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +create_openapi_http_exception_doc, +get_session, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", +), +], +session: Annotated[Session, Depends(get_session)], +) -> EdgeJobFetched | None: +"""Fetch a job to execute on the edge worker.""" +query = ( +select(EdgeJobModel) +.where(EdgeJobModel.state == TaskInstanceState.QUEUED) +.order_by(EdgeJobModel.queued_dttm) +) +if body.queues: +query = query.where(EdgeJobModel.queue.in_(body.queues)) +query = query.limit(1) +query = with_row_locks(query, of=EdgeJobModel, session=session, skip_locked=True) +job: EdgeJobModel = session.scalar(query) +if not job: +return None +job.state = TaskInstanceState.RUNNING +job.edge_worker = worker_name +job.last_update = timezone.utcnow() +session.commit() +return EdgeJobFetched( +dag_id=job.dag_id, +task_id=job.task_id, +run_id=job.run_id, +map_index=job.map_index, +try_number=job.try_number, +command=literal_eval(job.command), +concurrency_slots=job.concurrency_slots, +) + + +@jobs_router.patch( +"/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def state( +dag_id: Annotated[str, WorkerApiDocs.dag_id], +task_id: Annotated[str, WorkerApiDocs.task_id], +run_id: Annotated[str, WorkerApiDocs.run_id], +try_number: Annotated[int, WorkerApiDocs.try_number], +map_index: Annotated[int, WorkerApiDocs.map_index], +state: Annotated[TaskInstanceState, WorkerApiDocs.state], +session: Annotated[Session, Depends(get_session)], +) -> None: +"""Update the state of a job running on the edge worker.""" +query = select(EdgeJobModel).where( +EdgeJobModel.dag_id == dag_id, +EdgeJobModel.task_id == task_id, +EdgeJobModel.run_id == run_id, +EdgeJobModel.map_index == map_index, +EdgeJobModel.try_number == try_number, +) +job: EdgeJobModel = session.scalar(query) +if job: +job.state = state +job.last_update = timezone.utcnow() +session.commit() Review Comment: Ah, that is a good thing to optimize. Should have jumped into my eyes earlier! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apach
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864421800 ## providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py: ## @@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> return e.to_response() # type: ignore[attr-defined] +@provide_session +def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, session=NEW_SESSION) -> Any: +"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` endpoint for Airflow 2.10.""" +from flask import request + +try: +auth = request.headers.get("Authorization", "") +jwt_token_authorization(request.path, auth) +queues = body["queues"] if body else None +free_concurrency = body["free_concurrency"] if body else 1 +request_obj = WorkerQueuesBody(queues=queues, free_concurrency=free_concurrency) +job: EdgeJobFetched | None = fetch(worker_name, request_obj, session) +return job.model_dump() if job is not None else None +except HTTPException as e: +return e.to_response() # type: ignore[attr-defined] + + +@provide_session +def job_state_v2( +dag_id: str, +task_id: str, +run_id: str, +try_number: int, +map_index: str, # Note: Connexion can not have negative numbers in path parameters, use string therefore Review Comment: As answer above. Actually in all internals I want to have `int` just the connexion wrapper entry uses `str`to work-around the flask negative int value limitation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864421516 ## providers/src/airflow/providers/edge/worker_api/datamodels.py: ## @@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase): ) -class WorkerStateBody(BaseModel): +class EdgeJobBase(BaseModel): +"""Basic attributes of a job on the edge worker.""" + +dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to which the task belongs.") +task_id: str = Field(title="Task ID", description="Task name in the DAG.") +run_id: str = Field(title="Run ID", description="Run ID of the DAG execution.") +map_index: int = Field( +title="Map Index", +description="For dynamically mapped tasks the mapping number, -1 if the task is not mapped.", +) Review Comment: Just Flask has a parsing problem and therefore the Airflow 2.10 REST implementation in Connexion uses "string" for the map-index as there can be negative values. But I convert this to int() before going into the logic. The primary implementation is FastAPI and this can correctly treat negative integer values. So just in the openapi specs of connexion and the `_v2_routes.py` the map_index is treated as string to make it working. Otherwise FastAPI and Pydantic models should be consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864376303 ## providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py: ## @@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> return e.to_response() # type: ignore[attr-defined] +@provide_session +def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, session=NEW_SESSION) -> Any: Review Comment: That would be the type in FastAPI. But here we are in the "compatability leyer" which uses the fucntions of FastAPI and wraps into the same endpoint for Connexion. Connexion gets the JSON model as dump, not a pydantic object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1864373918 ## providers/src/airflow/providers/edge/worker_api/datamodels.py: ## @@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase): ) -class WorkerStateBody(BaseModel): +class EdgeJobBase(BaseModel): +"""Basic attributes of a job on the edge worker.""" + +dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to which the task belongs.") +task_id: str = Field(title="Task ID", description="Task name in the DAG.") +run_id: str = Field(title="Run ID", description="Run ID of the DAG execution.") +map_index: int = Field( +title="Map Index", +description="For dynamically mapped tasks the mapping number, -1 if the task is not mapped.", +) +try_number: int = Field(title="Try Number", description="The number of attempt to execute this task.") + +@property +def key(self) -> TaskInstanceKey: +return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index) + + +class EdgeJobFetched(EdgeJobBase): +"""Job that is to be executed on the edge worker.""" + +command: list[str] = Field(title="Command", description="Command line to use to execute the job.") +concurrency_slots: int = Field(description="Number of concurrency slots the job requires.") + + +class WorkerQueuesBase(BaseModel): +"""Queues that a worker supports to run jobs on.""" + +queues: Optional[List[str]] = Field( # noqa: UP006, UP007 - prevent pytest failing in back-compat Review Comment: I have this on my bucket list to check. Maybe the problem has been removed the last days when switching to Pydantic 2.10.2. It was a problem when pytest was runnign in backcompat and Pydantic could not load because not able to evaluate the type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
potiuk commented on PR #44433: URL: https://github.com/apache/airflow/pull/44433#issuecomment-2508408909 > UPDATE: Okay now I realize that the internal API is gone but all the previous functions are moved to provider package... need to catch-up coming home... :eyes: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl commented on PR #44433: URL: https://github.com/apache/airflow/pull/44433#issuecomment-2508280842 > I will wait for 1 and 2 to be merged :) As internal API is now "gone" I will have a hard time getting #1+#2 merged. As there are some merge conflicts as well I'll focus on PR#3 + #4... But anyway let me resolve the conflicts and integrate the feedback from Kaxil then it might be cleaner already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
potiuk commented on PR #44433: URL: https://github.com/apache/airflow/pull/44433#issuecomment-2507681738 I will wait for 1 and 2 to be merged :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862845850 ## providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py: ## @@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> return e.to_response() # type: ignore[attr-defined] +@provide_session +def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, session=NEW_SESSION) -> Any: Review Comment: Isn't the return type deterministic: `EdgeJobFetched | None`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862849168 ## providers/src/airflow/providers/edge/worker_api/routes/jobs.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ast import literal_eval +from typing import Annotated + +from sqlalchemy import select +from sqlalchemy.orm import Session # noqa: TCH002 + +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest +from airflow.providers.edge.worker_api.datamodels import ( +EdgeJobFetched, +WorkerApiDocs, +WorkerQueuesBody, +) +from airflow.providers.edge.worker_api.routes._v2_compat import ( +AirflowRouter, +Body, +Depends, +create_openapi_http_exception_doc, +get_session, +status, +) +from airflow.utils import timezone +from airflow.utils.sqlalchemy import with_row_locks +from airflow.utils.state import TaskInstanceState + +jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") + + +@jobs_router.get( +"/fetch/{worker_name}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def fetch( +worker_name: str, +body: Annotated[ +WorkerQueuesBody, +Body( +title="Log data chunks", +description="The worker remote has no access to log sink and with this can send log chunks to the central site.", +), +], +session: Annotated[Session, Depends(get_session)], +) -> EdgeJobFetched | None: +"""Fetch a job to execute on the edge worker.""" +query = ( +select(EdgeJobModel) +.where(EdgeJobModel.state == TaskInstanceState.QUEUED) +.order_by(EdgeJobModel.queued_dttm) +) +if body.queues: +query = query.where(EdgeJobModel.queue.in_(body.queues)) +query = query.limit(1) +query = with_row_locks(query, of=EdgeJobModel, session=session, skip_locked=True) +job: EdgeJobModel = session.scalar(query) +if not job: +return None +job.state = TaskInstanceState.RUNNING +job.edge_worker = worker_name +job.last_update = timezone.utcnow() +session.commit() +return EdgeJobFetched( +dag_id=job.dag_id, +task_id=job.task_id, +run_id=job.run_id, +map_index=job.map_index, +try_number=job.try_number, +command=literal_eval(job.command), +concurrency_slots=job.concurrency_slots, +) + + +@jobs_router.patch( +"/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}", +dependencies=[Depends(jwt_token_authorization_rest)], +responses=create_openapi_http_exception_doc( +[ +status.HTTP_400_BAD_REQUEST, +status.HTTP_403_FORBIDDEN, +] +), +) +def state( +dag_id: Annotated[str, WorkerApiDocs.dag_id], +task_id: Annotated[str, WorkerApiDocs.task_id], +run_id: Annotated[str, WorkerApiDocs.run_id], +try_number: Annotated[int, WorkerApiDocs.try_number], +map_index: Annotated[int, WorkerApiDocs.map_index], +state: Annotated[TaskInstanceState, WorkerApiDocs.state], +session: Annotated[Session, Depends(get_session)], +) -> None: +"""Update the state of a job running on the edge worker.""" +query = select(EdgeJobModel).where( +EdgeJobModel.dag_id == dag_id, +EdgeJobModel.task_id == task_id, +EdgeJobModel.run_id == run_id, +EdgeJobModel.map_index == map_index, +EdgeJobModel.try_number == try_number, +) +job: EdgeJobModel = session.scalar(query) +if job: +job.state = state +job.last_update = timezone.utcnow() +session.commit() Review Comment: ```py update(EdgeJobModel).where( EdgeJobModel.dag_id == dag_id, EdgeJobModel.task_id == task_id, EdgeJobModel.run_id == run_id, EdgeJobModel.map_index == map_index, EdgeJobModel.try_number == try_number, ) ).values(state=state, last_update=t
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862846445 ## providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py: ## @@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> return e.to_response() # type: ignore[attr-defined] +@provide_session +def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, session=NEW_SESSION) -> Any: +"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` endpoint for Airflow 2.10.""" +from flask import request + +try: +auth = request.headers.get("Authorization", "") +jwt_token_authorization(request.path, auth) +queues = body["queues"] if body else None +free_concurrency = body["free_concurrency"] if body else 1 +request_obj = WorkerQueuesBody(queues=queues, free_concurrency=free_concurrency) +job: EdgeJobFetched | None = fetch(worker_name, request_obj, session) +return job.model_dump() if job is not None else None +except HTTPException as e: +return e.to_response() # type: ignore[attr-defined] + + +@provide_session +def job_state_v2( +dag_id: str, +task_id: str, +run_id: str, +try_number: int, +map_index: str, # Note: Connexion can not have negative numbers in path parameters, use string therefore Review Comment: its str here why int above! https://github.com/apache/airflow/pull/44433/commits/a7df2861cf97ae1c8e14b766bbb7b8d687058acc#r1862844010 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862845347 ## providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py: ## @@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> return e.to_response() # type: ignore[attr-defined] +@provide_session +def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, session=NEW_SESSION) -> Any: +"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` endpoint for Airflow 2.10.""" +from flask import request + +try: +auth = request.headers.get("Authorization", "") +jwt_token_authorization(request.path, auth) +queues = body["queues"] if body else None +free_concurrency = body["free_concurrency"] if body else 1 Review Comment: ```py free_concurrency = body.get("free_concurrency", 1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862844761 ## providers/src/airflow/providers/edge/worker_api/datamodels.py: ## @@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase): ) -class WorkerStateBody(BaseModel): +class EdgeJobBase(BaseModel): +"""Basic attributes of a job on the edge worker.""" + +dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to which the task belongs.") +task_id: str = Field(title="Task ID", description="Task name in the DAG.") +run_id: str = Field(title="Run ID", description="Run ID of the DAG execution.") +map_index: int = Field( +title="Map Index", +description="For dynamically mapped tasks the mapping number, -1 if the task is not mapped.", +) +try_number: int = Field(title="Try Number", description="The number of attempt to execute this task.") + +@property +def key(self) -> TaskInstanceKey: +return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index) + + +class EdgeJobFetched(EdgeJobBase): +"""Job that is to be executed on the edge worker.""" + +command: list[str] = Field(title="Command", description="Command line to use to execute the job.") +concurrency_slots: int = Field(description="Number of concurrency slots the job requires.") + + +class WorkerQueuesBase(BaseModel): +"""Queues that a worker supports to run jobs on.""" + +queues: Optional[List[str]] = Field( # noqa: UP006, UP007 - prevent pytest failing in back-compat Review Comment: and ideally move to `Annotated` type and move description there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862845183 ## providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py: ## @@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) -> return e.to_response() # type: ignore[attr-defined] +@provide_session +def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, session=NEW_SESSION) -> Any: +"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` endpoint for Airflow 2.10.""" +from flask import request + +try: +auth = request.headers.get("Authorization", "") +jwt_token_authorization(request.path, auth) +queues = body["queues"] if body else None Review Comment: ```py queues = body.get("queues") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862844613 ## providers/src/airflow/providers/edge/worker_api/datamodels.py: ## @@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase): ) -class WorkerStateBody(BaseModel): +class EdgeJobBase(BaseModel): +"""Basic attributes of a job on the edge worker.""" + +dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to which the task belongs.") +task_id: str = Field(title="Task ID", description="Task name in the DAG.") +run_id: str = Field(title="Run ID", description="Run ID of the DAG execution.") +map_index: int = Field( +title="Map Index", +description="For dynamically mapped tasks the mapping number, -1 if the task is not mapped.", +) +try_number: int = Field(title="Try Number", description="The number of attempt to execute this task.") + +@property +def key(self) -> TaskInstanceKey: +return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index) + + +class EdgeJobFetched(EdgeJobBase): +"""Job that is to be executed on the edge worker.""" + +command: list[str] = Field(title="Command", description="Command line to use to execute the job.") +concurrency_slots: int = Field(description="Number of concurrency slots the job requires.") + + +class WorkerQueuesBase(BaseModel): +"""Queues that a worker supports to run jobs on.""" + +queues: Optional[List[str]] = Field( # noqa: UP006, UP007 - prevent pytest failing in back-compat Review Comment: Providers are now Py 3.9+ , so we can make this `list[str] | None` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
kaxil commented on code in PR #44433: URL: https://github.com/apache/airflow/pull/44433#discussion_r1862844010 ## providers/src/airflow/providers/edge/worker_api/datamodels.py: ## @@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase): ) -class WorkerStateBody(BaseModel): +class EdgeJobBase(BaseModel): +"""Basic attributes of a job on the edge worker.""" + +dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to which the task belongs.") +task_id: str = Field(title="Task ID", description="Task name in the DAG.") +run_id: str = Field(title="Run ID", description="Run ID of the DAG execution.") +map_index: int = Field( +title="Map Index", +description="For dynamically mapped tasks the mapping number, -1 if the task is not mapped.", +) Review Comment: While we have a comment about using `str` because of connexion, we have it as int here? Can we make it one or the other in the entire edge provider? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]
jscheffl opened a new pull request, #44433: URL: https://github.com/apache/airflow/pull/44433 Follow-up PR as incremental part of https://github.com/apache/airflow/pull/44311 and #44330 Note: Only the last commit is the relevant change, the first 8 commits are from https://github.com/apache/airflow/pull/44311 To prepare EdgeWorker to be independent of AIP-44 Internal API, this PR is the third step in adding/migrating to FastAPI. The calls to "Jobs" API to (1) fetch a job and (2) report job state are now real REST API calls, not using internal API. I would separate the other internal API calls to follow-up PRs as this is already quite large. Especially cause for ongoing Airflow 2.10 Connexion API + Swagger manually need to be generated whereas the main workstream for Airflow 3 uses FastAPI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org