Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
pierrejeambrun merged PR #44238: URL: https://github.com/apache/airflow/pull/44238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1853543099 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), Review Comment: Add it, but It's not that useful for logs. since there is no format to it, we just return a log(string). PTLA. ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: I have updated it as per suggestions. PTAL. ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1853193757 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + Review Comment: ```suggestion from pydantic import PositiveInt ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852303940 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: ```suggestion task_try_number: PositiveInt, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852697235 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: I forgot that try_number can be >0. In that case, we should use `PositiveInt` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
bbovenzi commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852666907 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: `task_try_number: int = 1` Let's add a default try number value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
bbovenzi commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852666907 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: `try_number: int = 1` Let's add a default try number value. and just `try_number` is sufficient. There is no need for the `task_` prefix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
pierrejeambrun commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852391515 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), Review Comment: we are missing the documentation for `text/plain` mimetype. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852315831 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if task_try_number <= 0: +raise HTTPException(status.HTTP_400_BAD_REQUEST, "task_try_number must be a positive integer") Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852315135 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: This will automatically raise 422. We don't need to separately check if its less than 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852299397 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False Review Comment: I find the existing version to be more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852303940 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: ```suggestion task_try_number: NonNegativeInt, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852304932 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: we can import this from pydantic `from pydantic import NonNegativeInt` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852298307 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False + +task_log_reader = TaskLogReader() + +if not task_log_reader.supports_read: +raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + +query = ( +select(TaskInstance) +.where( +TaskInstance.task_id == task_id, +TaskInstance.dag_id == dag_id, +TaskInstance.run_id == dag_run_id, +TaskInstance.map_index == map_index, +) +.join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) +) +ti = session.scalar(query) +if ti is None: +query = select(TaskInstanceHistory).where( +TaskInstanceHistory.task_id == task_id, +TaskInstanceHistory.dag_id == dag_id, +TaskInstanceHistory.run_id == dag_run_id, +TaskInstanceHistory.map_index == map_index, +TaskInstanceHistory.try_number == task_try_number, +) +ti = session.scalar(query) + +if ti is None: +metadata["end_of_log"] = True +raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + +dag = request.app.state.dag_bag.get_dag(dag_id) +if dag: +try: +ti.task = dag.get_task(ti.task_id) +except TaskNotFound: +pass + +logs: Any +if accept == Mimetype.JSON or accept == Mimetype.ANY: # default +logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) +logs = logs[0] if task_try_number is not None else logs Review Comment: I changed the code accordingly. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852297268 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: I have added a condition for this. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852277385 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." Review Comment: I think we should be consistent with earlier APIs. I lean towards `HTTP_400_BAD_REQUEST` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
pierrejeambrun commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852201214 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False + +task_log_reader = TaskLogReader() + +if not task_log_reader.supports_read: +raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + +query = ( +select(TaskInstance) +.where( +TaskInstance.task_id == task_id, +TaskInstance.dag_id == dag_id, +TaskInstance.run_id == dag_run_id, +TaskInstance.map_index == map_index, +) +.join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) +) +ti = session.scalar(query) +if ti is None: +query = select(TaskInstanceHistory).where( +TaskInstanceHistory.task_id == task_id, +TaskInstanceHistory.dag_id == dag_id, +TaskInstanceHistory.run_id == dag_run_id, +TaskInstanceHistory.map_index == map_index, +TaskInstanceHistory.try_number == task_try_number, +) +ti = session.scalar(query) + +if ti is None: +metadata["end_of_log"] = True +raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + +dag = request.app.state.dag_bag.get_dag(dag_id) +if dag: +try: +ti.task = dag.get_task(ti.task_id) +except TaskNotFound: +pass + +logs: Any +if accept == Mimetype.JSON or accept == Mimetype.ANY: # default +logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) +logs = logs[0] if task_try_number is not None else logs Review Comment: Indeed if someone pass `None` pydantic should 422 error, this should never be possible to reach that code. -- This is an automated message from the Apache Git Service. T
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
pierrejeambrun commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1852200060 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." Review Comment: Both sounds good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851963378 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: wondering if this should be NonNegativeInt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
pierrejeambrun commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851736606 ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=None, +) +def get_log( +*, +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +) -> Response | dict: +"""Get logs for specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False + +task_log_reader = TaskLogReader() + +if not task_log_reader.supports_read: +raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + +query = ( +select(TaskInstance) +.where( +TaskInstance.task_id == task_id, +TaskInstance.dag_id == dag_id, +TaskInstance.run_id == dag_run_id, +TaskInstance.map_index == map_index, +) +.join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) +) +ti = session.scalar(query) +if ti is None: +query = select(TaskInstanceHistory).where( +TaskInstanceHistory.task_id == task_id, +TaskInstanceHistory.dag_id == dag_id, +TaskInstanceHistory.run_id == dag_run_id, +TaskInstanceHistory.map_index == map_index, +TaskInstanceHistory.try_number == task_try_number, +) +ti = session.scalar(query) + +if ti is None: +metadata["end_of_log"] = True +raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + +dag = request.app.state.dag_bag.get_dag(dag_id) +if dag: +try: +ti.task = dag.get_task(ti.task_id) +except TaskNotFound: +pass + +return_type = request.headers["accept"] +# return_type would be either the above two or None +logs: Any +if return_type == "application/json" or return_type is None: # default +logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) +logs = logs[0] if task_try_number is not None else logs +# we must have token here, so we can safely ignore it +token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] +return TaskInstancesLogResponseObject(continuation_token=token, content=str(logs)).model_dump() +# text/plain. Stream +logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) +return Response(media_type="text/plain", content="".join(list(logs))) Review Comment: For the `Accept` header handling, you can take a look at `HeaderAcceptJsonOrText` and its usage. This way you can get rid of some manual handling and hard coded strings. ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## Review Comment: You put the endpoint in `task_instances`. This file is already really big, and originally it was in a separate `log_endpoint`, that might be better to keep it like this and create a new `log.py` file in the public routes. ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=None, Review Comment: ```suggestion ``` ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=None, +) +def get_log( +*, Review Comment: ```suggestion ``` ##
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851956895 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for specific task instance.""" Review Comment: ```suggestion """Get logs for a specific task instance.""" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851963378 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, Review Comment: wondering if task_try_number should be `NonNegativeInt` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851962540 ## airflow/api_fastapi/core_api/datamodels/log.py: ## @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pydantic import BaseModel + + +class TaskInstancesLogResponse(BaseModel): +"""Log serializer for responses.""" + +content: str +continuation_token: str | None + +@property +def text_format(self): +# convert all config sections to text Review Comment: Nope not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851990971 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False + +task_log_reader = TaskLogReader() + +if not task_log_reader.supports_read: +raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + +query = ( +select(TaskInstance) +.where( +TaskInstance.task_id == task_id, +TaskInstance.dag_id == dag_id, +TaskInstance.run_id == dag_run_id, +TaskInstance.map_index == map_index, +) +.join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) +) +ti = session.scalar(query) +if ti is None: +query = select(TaskInstanceHistory).where( +TaskInstanceHistory.task_id == task_id, +TaskInstanceHistory.dag_id == dag_id, +TaskInstanceHistory.run_id == dag_run_id, +TaskInstanceHistory.map_index == map_index, +TaskInstanceHistory.try_number == task_try_number, +) +ti = session.scalar(query) + +if ti is None: +metadata["end_of_log"] = True +raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + +dag = request.app.state.dag_bag.get_dag(dag_id) +if dag: +try: +ti.task = dag.get_task(ti.task_id) +except TaskNotFound: +pass + +logs: Any +if accept == Mimetype.JSON or accept == Mimetype.ANY: # default +logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) +logs = logs[0] if task_try_number is not None else logs Review Comment: we don't allow task_try_number to be None. So, do we need to check if it's None? -- This is an automated message from the Apache Git Service. To respond to the message, please l
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851988227 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False Review Comment: ```suggestion full_content = bool(metadata.get("download_logs")) metadata["download_logs"] = full_content ``` wondering if we can reduce this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851953321 ## airflow/api_fastapi/core_api/datamodels/log.py: ## @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pydantic import BaseModel + + +class TaskInstancesLogResponse(BaseModel): +"""Log serializer for responses.""" + +content: str +continuation_token: str | None + +@property +def text_format(self): +# convert all config sections to text Review Comment: do we need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851968926 ## airflow/api_fastapi/core_api/routes/public/log.py: ## @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( +tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=TaskInstancesLogResponse, +) +def get_log( +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +accept: HeaderAcceptJsonOrText, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +): +"""Get logs for a specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." Review Comment: since this is an issue with a field(token), can this be 422 status? @pierrejeambrun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851959538 ## airflow/api_fastapi/core_api/datamodels/log.py: ## @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pydantic import BaseModel + + +class TaskInstancesLogResponse(BaseModel): +"""Log serializer for responses.""" + +content: str +continuation_token: str | None + +@property +def text_format(self): +# convert all config sections to text +return "".join(list(self.content)) Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851928603 ## tests/api_fastapi/core_api/routes/public/test_task_instances.py: ## @@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): assert response.json() == { "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `nonexistent_task`, try_number: `0` and map_index: `-1` was not found" } + + +class TestTaskInstancesLog: +DAG_ID = "dag_for_testing_log_endpoint" +RUN_ID = "dag_run_id_for_testing_log_endpoint" +TASK_ID = "task_for_testing_log_endpoint" +MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint" +TRY_NUMBER = 1 + +default_time = "2020-06-10T20:00:00+00:00" + +@pytest.fixture(autouse=True) +def setup_attrs(self, configure_loggers, dag_maker, session) -> None: +self.app = create_app() +self.client = TestClient(self.app) +# Make sure that the configure_logging is not cached +self.old_modules = dict(sys.modules) + +with dag_maker(self.DAG_ID, start_date=timezone.parse(self.default_time), session=session) as dag: +EmptyOperator(task_id=self.TASK_ID) + +@task(task_id=self.MAPPED_TASK_ID) +def add_one(x: int): +return x + 1 + +add_one.expand(x=[1, 2, 3]) + +dr = dag_maker.create_dagrun( +run_id=self.RUN_ID, +run_type=DagRunType.SCHEDULED, +logical_date=timezone.parse(self.default_time), +start_date=timezone.parse(self.default_time), +) + +self.app.state.dag_bag.bag_dag(dag) + +# Add dummy dag for checking picking correct log with same task_id and different dag_id case. +with dag_maker( +f"{self.DAG_ID}_copy", start_date=timezone.parse(self.default_time), session=session +) as dummy_dag: +EmptyOperator(task_id=self.TASK_ID) +dr2 = dag_maker.create_dagrun( +run_id=self.RUN_ID, +run_type=DagRunType.SCHEDULED, +logical_date=timezone.parse(self.default_time), +start_date=timezone.parse(self.default_time), +) +self.app.state.dag_bag.bag_dag(dummy_dag) + +for ti in dr.task_instances: +ti.try_number = 1 +ti.hostname = "localhost" +session.merge(ti) +for ti in dr2.task_instances: +ti.try_number = 1 +ti.hostname = "localhost" +session.merge(ti) +session.flush() +dag.clear() +dummy_dag.clear() +for ti in dr.task_instances: +ti.try_number = 2 +ti.hostname = "localhost" +session.merge(ti) +for ti in dr2.task_instances: +ti.try_number = 2 +ti.hostname = "localhost" +session.merge(ti) +session.flush() + +@pytest.fixture +def configure_loggers(self, tmp_path, create_log_template): +self.log_dir = tmp_path + +# TASK_ID +dir_path = tmp_path / f"dag_id={self.DAG_ID}" / f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}" +dir_path.mkdir(parents=True) + +log = dir_path / "attempt=1.log" +log.write_text("Log for testing.") + +# try number 2 +log = dir_path / "attempt=2.log" +log.write_text("Log for testing 2.") + +# MAPPED_TASK_ID +for map_index in range(3): +dir_path = ( +tmp_path +/ f"dag_id={self.DAG_ID}" +/ f"run_id={self.RUN_ID}" +/ f"task_id={self.MAPPED_TASK_ID}" +/ f"map_index={map_index}" +) + +dir_path.mkdir(parents=True) + +log = dir_path / "attempt=1.log" +log.write_text("Log for testing.") + +# try number 2 +log = dir_path / "attempt=2.log" +log.write_text("Log for testing 2.") + +# Create a custom logging configuration +logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) +logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir + +logging.config.dictConfig(logging_config) + +yield + +logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + +def teardown_method(self): +clear_db_runs() + +@pytest.mark.parametrize("try_number", [1, 2]) +def test_should_respond_200_json(self, try_number): +key = self.app.state.secret_key +serializer = URLSafeSerializer(key) +token = serializer.dumps({"download_logs": False}) +response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", +params={"token": token}, +headers={"Accept": "application/json"}, +# e
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851927314 ## tests/api_fastapi/core_api/routes/public/test_task_instances.py: ## @@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): assert response.json() == { "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `nonexistent_task`, try_number: `0` and map_index: `-1` was not found" } + + +class TestTaskInstancesLog: +DAG_ID = "dag_for_testing_log_endpoint" +RUN_ID = "dag_run_id_for_testing_log_endpoint" +TASK_ID = "task_for_testing_log_endpoint" +MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint" +TRY_NUMBER = 1 + +default_time = "2020-06-10T20:00:00+00:00" + +@pytest.fixture(autouse=True) +def setup_attrs(self, configure_loggers, dag_maker, session) -> None: +self.app = create_app() +self.client = TestClient(self.app) Review Comment: User the fixtures in the test cases. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851926515 ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## Review Comment: Moved the new code/tests/data models to log.py -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851927936 ## tests/api_fastapi/core_api/routes/public/test_task_instances.py: ## @@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): assert response.json() == { "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `nonexistent_task`, try_number: `0` and map_index: `-1` was not found" } + + +class TestTaskInstancesLog: +DAG_ID = "dag_for_testing_log_endpoint" +RUN_ID = "dag_run_id_for_testing_log_endpoint" +TASK_ID = "task_for_testing_log_endpoint" +MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint" +TRY_NUMBER = 1 + +default_time = "2020-06-10T20:00:00+00:00" + +@pytest.fixture(autouse=True) +def setup_attrs(self, configure_loggers, dag_maker, session) -> None: +self.app = create_app() Review Comment: Used the fixture in the test PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851925707 ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=None, Review Comment: Updated it by taking reference of the APIs - get_config/ get_dag_source, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851923675 ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=None, +) +def get_log( +*, Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851919813 ## airflow/api_fastapi/core_api/routes/public/task_instances.py: ## @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( +"/{task_id}/logs/{task_try_number}", +responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +response_model=None, +) +def get_log( +*, +dag_id: str, +dag_run_id: str, +task_id: str, +task_try_number: int, +request: Request, +session: Annotated[Session, Depends(get_session)], +full_content: bool = False, +map_index: int = -1, +token: str | None = None, +) -> Response | dict: +"""Get logs for specific task instance.""" +if not token: +metadata = {} +else: +try: +metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) +except BadSignature: +raise HTTPException( +status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." +) + +if metadata.get("download_logs") and metadata["download_logs"]: +full_content = True + +if full_content: +metadata["download_logs"] = True +else: +metadata["download_logs"] = False + +task_log_reader = TaskLogReader() + +if not task_log_reader.supports_read: +raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + +query = ( +select(TaskInstance) +.where( +TaskInstance.task_id == task_id, +TaskInstance.dag_id == dag_id, +TaskInstance.run_id == dag_run_id, +TaskInstance.map_index == map_index, +) +.join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) +) +ti = session.scalar(query) +if ti is None: +query = select(TaskInstanceHistory).where( +TaskInstanceHistory.task_id == task_id, +TaskInstanceHistory.dag_id == dag_id, +TaskInstanceHistory.run_id == dag_run_id, +TaskInstanceHistory.map_index == map_index, +TaskInstanceHistory.try_number == task_try_number, +) +ti = session.scalar(query) + +if ti is None: +metadata["end_of_log"] = True +raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + +dag = request.app.state.dag_bag.get_dag(dag_id) +if dag: +try: +ti.task = dag.get_task(ti.task_id) +except TaskNotFound: +pass + +return_type = request.headers["accept"] +# return_type would be either the above two or None +logs: Any +if return_type == "application/json" or return_type is None: # default +logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) +logs = logs[0] if task_try_number is not None else logs +# we must have token here, so we can safely ignore it +token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] +return TaskInstancesLogResponseObject(continuation_token=token, content=str(logs)).model_dump() +# text/plain. Stream +logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) +return Response(media_type="text/plain", content="".join(list(logs))) Review Comment: Used `accept: HeaderAcceptJsonOrText` as per suggestions, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
utkarsharma2 commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851918926 ## airflow/api_fastapi/core_api/datamodels/task_instances.py: ## @@ -150,3 +150,11 @@ class TaskInstanceHistoryCollectionResponse(BaseModel): task_instances: list[TaskInstanceHistoryResponse] total_entries: int + + +# Response Models +class TaskInstancesLogResponseObject(BaseModel): Review Comment: Updated it accordingly, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-84: Migrate get_log endpoint [airflow]
rawwar commented on code in PR #44238: URL: https://github.com/apache/airflow/pull/44238#discussion_r1851754362 ## airflow/api_fastapi/core_api/datamodels/task_instances.py: ## @@ -150,3 +150,11 @@ class TaskInstanceHistoryCollectionResponse(BaseModel): task_instances: list[TaskInstanceHistoryResponse] total_entries: int + + +# Response Models +class TaskInstancesLogResponseObject(BaseModel): Review Comment: ```suggestion class TaskInstancesLogResponse(BaseModel): ``` Making it consistent with other classes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org