Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-12-01 Thread via GitHub


potiuk commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864885355


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   Wait... It's not human :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-12-01 Thread via GitHub


potiuk commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864885260


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   Indeed. Almost un-human with this check 😱 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-12-01 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864881282


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   Copilot is getting smarter - nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-12-01 Thread via GitHub


potiuk commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864878987


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   AHA.. Cool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-12-01 Thread via GitHub


jscheffl merged PR #44433:
URL: https://github.com/apache/airflow/pull/44433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-12-01 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864801279


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   @potiuk I also first thought so... first time... then reading again... I 
really had a copy&past error being made and the description was really for a 
different case.
   But here the worker reports the queues and capacity he supports which will 
be a filter criteria for the service to select the next job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


potiuk commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864574326


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   That's a strange suggestion copilot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


Copilot commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864570662


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select, update
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+SessionDep,
+create_openapi_http_exception_doc,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",

Review Comment:
   The description for the `body` parameter in the `fetch` function is 
incorrect. It should describe the purpose of `WorkerQueuesBody`.
   ```suggestion
   description="The queues from which the worker can fetch jobs.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on PR #44433:
URL: https://github.com/apache/airflow/pull/44433#issuecomment-2509403396

   > I will wait for 1 and 2 to be merged :)
   
   You just merged 2 after I merged 1... so "ball is in your field" now :-D


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864437361


##
providers/src/airflow/providers/edge/worker_api/datamodels.py:
##
@@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase):
 )
 
 
-class WorkerStateBody(BaseModel):
+class EdgeJobBase(BaseModel):
+"""Basic attributes of a job on the edge worker."""
+
+dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to 
which the task belongs.")
+task_id: str = Field(title="Task ID", description="Task name in the DAG.")
+run_id: str = Field(title="Run ID", description="Run ID of the DAG 
execution.")
+map_index: int = Field(
+title="Map Index",
+description="For dynamically mapped tasks the mapping number, -1 if 
the task is not mapped.",
+)
+try_number: int = Field(title="Try Number", description="The number of 
attempt to execute this task.")
+
+@property
+def key(self) -> TaskInstanceKey:
+return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, 
self.try_number, self.map_index)
+
+
+class EdgeJobFetched(EdgeJobBase):
+"""Job that is to be executed on the edge worker."""
+
+command: list[str] = Field(title="Command", description="Command line to 
use to execute the job.")
+concurrency_slots: int = Field(description="Number of concurrency slots 
the job requires.")
+
+
+class WorkerQueuesBase(BaseModel):
+"""Queues that a worker supports to run jobs on."""
+
+queues: Optional[List[str]] = Field(  # noqa: UP006, UP007 - prevent 
pytest failing in back-compat

Review Comment:
   Okay, pushed the commit to clean this up and seems the upgrade of pydantic 
fixed it. Added this in the last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864424751


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select
+from sqlalchemy.orm import Session  # noqa: TCH002
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+create_openapi_http_exception_doc,
+get_session,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",
+),
+],
+session: Annotated[Session, Depends(get_session)],
+) -> EdgeJobFetched | None:
+"""Fetch a job to execute on the edge worker."""
+query = (
+select(EdgeJobModel)
+.where(EdgeJobModel.state == TaskInstanceState.QUEUED)
+.order_by(EdgeJobModel.queued_dttm)
+)
+if body.queues:
+query = query.where(EdgeJobModel.queue.in_(body.queues))
+query = query.limit(1)
+query = with_row_locks(query, of=EdgeJobModel, session=session, 
skip_locked=True)
+job: EdgeJobModel = session.scalar(query)
+if not job:
+return None
+job.state = TaskInstanceState.RUNNING
+job.edge_worker = worker_name
+job.last_update = timezone.utcnow()
+session.commit()
+return EdgeJobFetched(
+dag_id=job.dag_id,
+task_id=job.task_id,
+run_id=job.run_id,
+map_index=job.map_index,
+try_number=job.try_number,
+command=literal_eval(job.command),
+concurrency_slots=job.concurrency_slots,
+)
+
+
+@jobs_router.patch(
+"/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def state(
+dag_id: Annotated[str, WorkerApiDocs.dag_id],
+task_id: Annotated[str, WorkerApiDocs.task_id],
+run_id: Annotated[str, WorkerApiDocs.run_id],
+try_number: Annotated[int, WorkerApiDocs.try_number],
+map_index: Annotated[int, WorkerApiDocs.map_index],
+state: Annotated[TaskInstanceState, WorkerApiDocs.state],
+session: Annotated[Session, Depends(get_session)],
+) -> None:
+"""Update the state of a job running on the edge worker."""
+query = select(EdgeJobModel).where(
+EdgeJobModel.dag_id == dag_id,
+EdgeJobModel.task_id == task_id,
+EdgeJobModel.run_id == run_id,
+EdgeJobModel.map_index == map_index,
+EdgeJobModel.try_number == try_number,
+)
+job: EdgeJobModel = session.scalar(query)
+if job:
+job.state = state
+job.last_update = timezone.utcnow()
+session.commit()

Review Comment:
   Ah, that is a good thing to optimize. Should have jumped into my eyes 
earlier!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apach

Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864421800


##
providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py:
##
@@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], 
session=NEW_SESSION) ->
 return e.to_response()  # type: ignore[attr-defined]
 
 
+@provide_session
+def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, 
session=NEW_SESSION) -> Any:
+"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` 
endpoint for Airflow 2.10."""
+from flask import request
+
+try:
+auth = request.headers.get("Authorization", "")
+jwt_token_authorization(request.path, auth)
+queues = body["queues"] if body else None
+free_concurrency = body["free_concurrency"] if body else 1
+request_obj = WorkerQueuesBody(queues=queues, 
free_concurrency=free_concurrency)
+job: EdgeJobFetched | None = fetch(worker_name, request_obj, session)
+return job.model_dump() if job is not None else None
+except HTTPException as e:
+return e.to_response()  # type: ignore[attr-defined]
+
+
+@provide_session
+def job_state_v2(
+dag_id: str,
+task_id: str,
+run_id: str,
+try_number: int,
+map_index: str,  # Note: Connexion can not have negative numbers in path 
parameters, use string therefore

Review Comment:
   As answer above. Actually in all internals I want to have `int` just the 
connexion wrapper entry uses `str`to work-around the flask negative int value 
limitation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864421516


##
providers/src/airflow/providers/edge/worker_api/datamodels.py:
##
@@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase):
 )
 
 
-class WorkerStateBody(BaseModel):
+class EdgeJobBase(BaseModel):
+"""Basic attributes of a job on the edge worker."""
+
+dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to 
which the task belongs.")
+task_id: str = Field(title="Task ID", description="Task name in the DAG.")
+run_id: str = Field(title="Run ID", description="Run ID of the DAG 
execution.")
+map_index: int = Field(
+title="Map Index",
+description="For dynamically mapped tasks the mapping number, -1 if 
the task is not mapped.",
+)

Review Comment:
   Just Flask has a parsing problem and therefore the Airflow 2.10 REST 
implementation in Connexion uses "string" for the map-index as there can be 
negative values. But I convert this to int() before going into the logic.
   
   The primary implementation is FastAPI and this can correctly treat negative 
integer values. So just in the openapi specs of connexion and the 
`_v2_routes.py` the map_index is treated as string to make it working. 
Otherwise FastAPI and Pydantic models should be consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864376303


##
providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py:
##
@@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], 
session=NEW_SESSION) ->
 return e.to_response()  # type: ignore[attr-defined]
 
 
+@provide_session
+def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, 
session=NEW_SESSION) -> Any:

Review Comment:
   That would be the type in FastAPI. But here we are in the "compatability 
leyer" which uses the fucntions of FastAPI and wraps into the same endpoint for 
Connexion. Connexion gets the JSON model as dump, not a pydantic object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-30 Thread via GitHub


jscheffl commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1864373918


##
providers/src/airflow/providers/edge/worker_api/datamodels.py:
##
@@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase):
 )
 
 
-class WorkerStateBody(BaseModel):
+class EdgeJobBase(BaseModel):
+"""Basic attributes of a job on the edge worker."""
+
+dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to 
which the task belongs.")
+task_id: str = Field(title="Task ID", description="Task name in the DAG.")
+run_id: str = Field(title="Run ID", description="Run ID of the DAG 
execution.")
+map_index: int = Field(
+title="Map Index",
+description="For dynamically mapped tasks the mapping number, -1 if 
the task is not mapped.",
+)
+try_number: int = Field(title="Try Number", description="The number of 
attempt to execute this task.")
+
+@property
+def key(self) -> TaskInstanceKey:
+return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, 
self.try_number, self.map_index)
+
+
+class EdgeJobFetched(EdgeJobBase):
+"""Job that is to be executed on the edge worker."""
+
+command: list[str] = Field(title="Command", description="Command line to 
use to execute the job.")
+concurrency_slots: int = Field(description="Number of concurrency slots 
the job requires.")
+
+
+class WorkerQueuesBase(BaseModel):
+"""Queues that a worker supports to run jobs on."""
+
+queues: Optional[List[str]] = Field(  # noqa: UP006, UP007 - prevent 
pytest failing in back-compat

Review Comment:
   I have this on my bucket list to check. Maybe the problem has been removed 
the last days when switching to Pydantic 2.10.2. It was a problem when pytest 
was runnign in backcompat and Pydantic could not load because not able to 
evaluate the type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-29 Thread via GitHub


potiuk commented on PR #44433:
URL: https://github.com/apache/airflow/pull/44433#issuecomment-2508408909

   > UPDATE: Okay now I realize that the internal API is gone but all the 
previous functions are moved to provider package... need to catch-up coming 
home...
   
   :eyes: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-29 Thread via GitHub


jscheffl commented on PR #44433:
URL: https://github.com/apache/airflow/pull/44433#issuecomment-2508280842

   > I will wait for 1 and 2 to be merged :)
   
   As internal API is now "gone" I will have a hard time getting #1+#2 merged. 
As there are some merge conflicts as well I'll focus on PR#3 + #4...
   
   But anyway let me resolve the conflicts and integrate the feedback from 
Kaxil then it might be cleaner already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-29 Thread via GitHub


potiuk commented on PR #44433:
URL: https://github.com/apache/airflow/pull/44433#issuecomment-2507681738

   I will wait for 1 and 2 to be merged :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862845850


##
providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py:
##
@@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], 
session=NEW_SESSION) ->
 return e.to_response()  # type: ignore[attr-defined]
 
 
+@provide_session
+def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, 
session=NEW_SESSION) -> Any:

Review Comment:
   Isn't the return type deterministic: `EdgeJobFetched | None`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862849168


##
providers/src/airflow/providers/edge/worker_api/routes/jobs.py:
##
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from ast import literal_eval
+from typing import Annotated
+
+from sqlalchemy import select
+from sqlalchemy.orm import Session  # noqa: TCH002
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
+from airflow.providers.edge.worker_api.datamodels import (
+EdgeJobFetched,
+WorkerApiDocs,
+WorkerQueuesBody,
+)
+from airflow.providers.edge.worker_api.routes._v2_compat import (
+AirflowRouter,
+Body,
+Depends,
+create_openapi_http_exception_doc,
+get_session,
+status,
+)
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import with_row_locks
+from airflow.utils.state import TaskInstanceState
+
+jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs")
+
+
+@jobs_router.get(
+"/fetch/{worker_name}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def fetch(
+worker_name: str,
+body: Annotated[
+WorkerQueuesBody,
+Body(
+title="Log data chunks",
+description="The worker remote has no access to log sink and with 
this can send log chunks to the central site.",
+),
+],
+session: Annotated[Session, Depends(get_session)],
+) -> EdgeJobFetched | None:
+"""Fetch a job to execute on the edge worker."""
+query = (
+select(EdgeJobModel)
+.where(EdgeJobModel.state == TaskInstanceState.QUEUED)
+.order_by(EdgeJobModel.queued_dttm)
+)
+if body.queues:
+query = query.where(EdgeJobModel.queue.in_(body.queues))
+query = query.limit(1)
+query = with_row_locks(query, of=EdgeJobModel, session=session, 
skip_locked=True)
+job: EdgeJobModel = session.scalar(query)
+if not job:
+return None
+job.state = TaskInstanceState.RUNNING
+job.edge_worker = worker_name
+job.last_update = timezone.utcnow()
+session.commit()
+return EdgeJobFetched(
+dag_id=job.dag_id,
+task_id=job.task_id,
+run_id=job.run_id,
+map_index=job.map_index,
+try_number=job.try_number,
+command=literal_eval(job.command),
+concurrency_slots=job.concurrency_slots,
+)
+
+
+@jobs_router.patch(
+"/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}",
+dependencies=[Depends(jwt_token_authorization_rest)],
+responses=create_openapi_http_exception_doc(
+[
+status.HTTP_400_BAD_REQUEST,
+status.HTTP_403_FORBIDDEN,
+]
+),
+)
+def state(
+dag_id: Annotated[str, WorkerApiDocs.dag_id],
+task_id: Annotated[str, WorkerApiDocs.task_id],
+run_id: Annotated[str, WorkerApiDocs.run_id],
+try_number: Annotated[int, WorkerApiDocs.try_number],
+map_index: Annotated[int, WorkerApiDocs.map_index],
+state: Annotated[TaskInstanceState, WorkerApiDocs.state],
+session: Annotated[Session, Depends(get_session)],
+) -> None:
+"""Update the state of a job running on the edge worker."""
+query = select(EdgeJobModel).where(
+EdgeJobModel.dag_id == dag_id,
+EdgeJobModel.task_id == task_id,
+EdgeJobModel.run_id == run_id,
+EdgeJobModel.map_index == map_index,
+EdgeJobModel.try_number == try_number,
+)
+job: EdgeJobModel = session.scalar(query)
+if job:
+job.state = state
+job.last_update = timezone.utcnow()
+session.commit()

Review Comment:
   ```py
   update(EdgeJobModel).where(
   EdgeJobModel.dag_id == dag_id,
   EdgeJobModel.task_id == task_id,
   EdgeJobModel.run_id == run_id,
   EdgeJobModel.map_index == map_index,
   EdgeJobModel.try_number == try_number,
   )
   ).values(state=state, last_update=t

Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862846445


##
providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py:
##
@@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], 
session=NEW_SESSION) ->
 return e.to_response()  # type: ignore[attr-defined]
 
 
+@provide_session
+def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, 
session=NEW_SESSION) -> Any:
+"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` 
endpoint for Airflow 2.10."""
+from flask import request
+
+try:
+auth = request.headers.get("Authorization", "")
+jwt_token_authorization(request.path, auth)
+queues = body["queues"] if body else None
+free_concurrency = body["free_concurrency"] if body else 1
+request_obj = WorkerQueuesBody(queues=queues, 
free_concurrency=free_concurrency)
+job: EdgeJobFetched | None = fetch(worker_name, request_obj, session)
+return job.model_dump() if job is not None else None
+except HTTPException as e:
+return e.to_response()  # type: ignore[attr-defined]
+
+
+@provide_session
+def job_state_v2(
+dag_id: str,
+task_id: str,
+run_id: str,
+try_number: int,
+map_index: str,  # Note: Connexion can not have negative numbers in path 
parameters, use string therefore

Review Comment:
   its str here why int above! 
https://github.com/apache/airflow/pull/44433/commits/a7df2861cf97ae1c8e14b766bbb7b8d687058acc#r1862844010



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862845347


##
providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py:
##
@@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], 
session=NEW_SESSION) ->
 return e.to_response()  # type: ignore[attr-defined]
 
 
+@provide_session
+def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, 
session=NEW_SESSION) -> Any:
+"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` 
endpoint for Airflow 2.10."""
+from flask import request
+
+try:
+auth = request.headers.get("Authorization", "")
+jwt_token_authorization(request.path, auth)
+queues = body["queues"] if body else None
+free_concurrency = body["free_concurrency"] if body else 1

Review Comment:
   ```py
   free_concurrency = body.get("free_concurrency", 1)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862844761


##
providers/src/airflow/providers/edge/worker_api/datamodels.py:
##
@@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase):
 )
 
 
-class WorkerStateBody(BaseModel):
+class EdgeJobBase(BaseModel):
+"""Basic attributes of a job on the edge worker."""
+
+dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to 
which the task belongs.")
+task_id: str = Field(title="Task ID", description="Task name in the DAG.")
+run_id: str = Field(title="Run ID", description="Run ID of the DAG 
execution.")
+map_index: int = Field(
+title="Map Index",
+description="For dynamically mapped tasks the mapping number, -1 if 
the task is not mapped.",
+)
+try_number: int = Field(title="Try Number", description="The number of 
attempt to execute this task.")
+
+@property
+def key(self) -> TaskInstanceKey:
+return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, 
self.try_number, self.map_index)
+
+
+class EdgeJobFetched(EdgeJobBase):
+"""Job that is to be executed on the edge worker."""
+
+command: list[str] = Field(title="Command", description="Command line to 
use to execute the job.")
+concurrency_slots: int = Field(description="Number of concurrency slots 
the job requires.")
+
+
+class WorkerQueuesBase(BaseModel):
+"""Queues that a worker supports to run jobs on."""
+
+queues: Optional[List[str]] = Field(  # noqa: UP006, UP007 - prevent 
pytest failing in back-compat

Review Comment:
   and ideally move to `Annotated` type and move description there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862845183


##
providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py:
##
@@ -90,6 +98,44 @@ def set_state_v2(worker_name: str, body: dict[str, Any], 
session=NEW_SESSION) ->
 return e.to_response()  # type: ignore[attr-defined]
 
 
+@provide_session
+def job_fetch_v2(worker_name: str, body: dict[str, Any] | None = None, 
session=NEW_SESSION) -> Any:
+"""Handle Edge Worker API `/edge_worker/v1/jobs/fetch/{worker_name}` 
endpoint for Airflow 2.10."""
+from flask import request
+
+try:
+auth = request.headers.get("Authorization", "")
+jwt_token_authorization(request.path, auth)
+queues = body["queues"] if body else None

Review Comment:
   ```py
   queues = body.get("queues")
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862844613


##
providers/src/airflow/providers/edge/worker_api/datamodels.py:
##
@@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase):
 )
 
 
-class WorkerStateBody(BaseModel):
+class EdgeJobBase(BaseModel):
+"""Basic attributes of a job on the edge worker."""
+
+dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to 
which the task belongs.")
+task_id: str = Field(title="Task ID", description="Task name in the DAG.")
+run_id: str = Field(title="Run ID", description="Run ID of the DAG 
execution.")
+map_index: int = Field(
+title="Map Index",
+description="For dynamically mapped tasks the mapping number, -1 if 
the task is not mapped.",
+)
+try_number: int = Field(title="Try Number", description="The number of 
attempt to execute this task.")
+
+@property
+def key(self) -> TaskInstanceKey:
+return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, 
self.try_number, self.map_index)
+
+
+class EdgeJobFetched(EdgeJobBase):
+"""Job that is to be executed on the edge worker."""
+
+command: list[str] = Field(title="Command", description="Command line to 
use to execute the job.")
+concurrency_slots: int = Field(description="Number of concurrency slots 
the job requires.")
+
+
+class WorkerQueuesBase(BaseModel):
+"""Queues that a worker supports to run jobs on."""
+
+queues: Optional[List[str]] = Field(  # noqa: UP006, UP007 - prevent 
pytest failing in back-compat

Review Comment:
   Providers are now Py 3.9+ , so we can make this `list[str] | None`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44433:
URL: https://github.com/apache/airflow/pull/44433#discussion_r1862844010


##
providers/src/airflow/providers/edge/worker_api/datamodels.py:
##
@@ -53,19 +55,56 @@ class JsonRpcRequest(JsonRpcRequestBase):
 )
 
 
-class WorkerStateBody(BaseModel):
+class EdgeJobBase(BaseModel):
+"""Basic attributes of a job on the edge worker."""
+
+dag_id: str = Field(title="Dag ID", description="Identifier of the DAG to 
which the task belongs.")
+task_id: str = Field(title="Task ID", description="Task name in the DAG.")
+run_id: str = Field(title="Run ID", description="Run ID of the DAG 
execution.")
+map_index: int = Field(
+title="Map Index",
+description="For dynamically mapped tasks the mapping number, -1 if 
the task is not mapped.",
+)

Review Comment:
   While we have a comment about using `str` because of connexion, we have it 
as int here? Can we make it one or the other in the entire edge provider?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Migrate Edge calls for Worker to FastAPI part 3 - Jobs routes [airflow]

2024-11-27 Thread via GitHub


jscheffl opened a new pull request, #44433:
URL: https://github.com/apache/airflow/pull/44433

   Follow-up PR as incremental part of 
https://github.com/apache/airflow/pull/44311 and #44330
   
   Note: Only the last commit is the relevant change, the first 8 commits are 
from https://github.com/apache/airflow/pull/44311
   
   To prepare EdgeWorker to be independent of AIP-44 Internal API, this PR is 
the third step in adding/migrating to FastAPI. The calls to "Jobs" API to (1) 
fetch a job and (2) report job state are now real REST API calls, not using 
internal API.
   
   I would separate the other internal API calls to follow-up PRs as this is 
already quite large. Especially cause for ongoing Airflow 2.10 Connexion API + 
Swagger manually need to be generated whereas the main workstream for Airflow 3 
uses FastAPI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org