Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
github-actions[bot] commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238814837 ### Backport failed to create: v3-2-test. View the failure log Run details Note: As of [Merging PRs targeted for Airflow 3.X](https://github.com/apache/airflow/blob/main/dev/README_AIRFLOW3_DEV.md#merging-prs-targeted-for-airflow-3x) the committer who merges the PR is responsible for backporting the PRs that are bug fixes (generally speaking) to the maintenance branches. In matter of doubt please ask in [#release-management](https://apache-airflow.slack.com/archives/C03G9H97MM2) Slack channel. Status Branch Result ❌ v3-2-test https://github.com/apache/airflow/commit/5559365a4b016817fad3d16ec710b716f1ddd945";> You can attempt to backport this manually by running: ```bash cherry_picker 5559365 v3-2-test ``` This should apply the commit to the v3-2-test branch and leave the commit in conflict state marking the files that need manual conflict resolution. After you have resolved the conflicts, you can continue the backport process by running: ```bash cherry_picker --continue ``` If you don't have cherry-picker installed, see the [installation guide](https://github.com/apache/airflow/blob/main/dev/README_AIRFLOW3_DEV.md#how-to-backport-pr-with-cherry-picker-cli). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
ashb merged PR #65162: URL: https://github.com/apache/airflow/pull/65162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
potiuk commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238400440 Out of curiosity - because It seems that with request scope we are going to keep opened database session throughout the whole streaming. Would not that be better (and getting read of the Fast API Dependency ? ```python with _get_session() as session: tis = session.execute( select( TaskInstance.task_id, TaskInstance.state, TaskInstance.dag_version_id, TaskInstance.start_date, TaskInstance.end_date, DagVersion.version_number, ) .outerjoin(DagVersion, TaskInstance.dag_version_id == DagVersion.id) .where(TaskInstance.dag_id == dag_id) .where(TaskInstance.run_id == run_id) .order_by(TaskInstance.task_id) .execution_options(yield_per=1000) ) summary = _build_ti_summaries( dag_id, run_id, tis, session, serdag_cache=serdag_cache, ) if summary is None: continue ``` I probably miss something about async behaviour here (maybe?) but this one would only keep session only for single run_id retrieval and free it while the task summary is being streamed? It seems a bit wasteful to keep the sesssion opened and held whlie we are essentially streaming data back? Or am I wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
ashb commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238242208 @seruman Nah all good. The more eyes the better! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
seruman commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238240292 > Oh actually, no that one is fine, as the DagRunWaiter manages it's own sessions cleanly, so the function-scoped dep is fine (and means that one session isn't kept open.) > > I've updated the tests and added exclusion lists. Oh my bad, sorry about that. Jump to conclusion without trying to reproduce and did some dumb pattern matching. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
ashb commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238227167 Oh actually, no that one is fine, as the DagRunWaiter manages it's own sessions cleanly, so the function-scoped dep is fine (and means that one session isn't kept open.) I've updated the tests and added exclusion lists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
ashb commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238170379 Good catch @seruman! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
seruman commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238166949 This one seems to slip behind the type annotation check in the test; it streams, accepts `SessionDep` but lacks type annotation. https://github.com/apache/airflow/blob/048e9a191b4a6625bb4bbad23fad537b256f4062/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py#L539-L573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
potiuk commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238145988 Nice find ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
jedcunningham commented on PR #65162: URL: https://github.com/apache/airflow/pull/65162#issuecomment-4238154764 I tested this against my helm based reproduction, and it solved it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
Copilot commented on code in PR #65162:
URL: https://github.com/apache/airflow/pull/65162#discussion_r3074463620
##
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -453,7 +453,7 @@ def get_node_summaries() -> Iterable[dict[str, Any]]:
)
def get_grid_ti_summaries_stream(
dag_id: str,
-session: SessionDep,
+session: Annotated[Session, Depends(_get_session)],
Review Comment:
`_get_session` is a module-private helper (leading underscore) but is now
imported/used directly from another module. To keep dependency usage consistent
and avoid coupling to a private symbol, consider exposing a public
request-scoped session dependency (e.g. `RequestSessionDep = Annotated[Session,
Depends(_get_session)]`) in `common.db.common` and using that here (or
rename/export the helper) instead of importing `_get_session` directly.
##
airflow-core/tests/unit/api_fastapi/core_api/test_app.py:
##
@@ -16,13 +16,71 @@
# under the License.
from __future__ import annotations
+import typing
+
import pytest
+from fastapi.params import Depends as DependsClass
+from fastapi.responses import StreamingResponse
+from starlette.routing import Mount
+
+from airflow.api_fastapi.app import create_app
from tests_common.test_utils.db import clear_db_jobs
pytestmark = pytest.mark.db_test
+def _get_all_api_routes(app):
+"""Recursively yield all APIRoutes from the app and its mounted
sub-apps."""
+for route in getattr(app, "routes", []):
+if isinstance(route, Mount) and hasattr(route, "app"):
+yield from _get_all_api_routes(route.app)
+if hasattr(route, "endpoint"):
+yield route
+
+
+class TestStreamingEndpointSessionScope:
+def test_no_streaming_endpoint_uses_function_scoped_depends(self):
+"""Streaming endpoints must not use function-scoped generator
dependencies.
+
+FastAPI's ``function_stack`` (used for ``scope="function"``
dependencies)
+is torn down after the route handler returns but *before* the response
body
+is sent. For ``StreamingResponse`` endpoints the response body is
produced
+by a generator that runs during sending, so any generator dependency
with
+``scope="function"`` will have its cleanup run before the generator
+executes. This causes the generator to silently reopen the session via
+autobegin, and the resulting connection is never returned to the pool.
+"""
+app = create_app()
+violations = []
+for route in _get_all_api_routes(app):
+try:
+hints = typing.get_type_hints(route.endpoint,
include_extras=True)
+except Exception:
+continue
+if hints.get("return") is not StreamingResponse:
+continue
Review Comment:
This test only flags routes whose *return type annotation* is exactly
`StreamingResponse`, and it silently skips any endpoint where
`typing.get_type_hints()` raises. That combination can allow streaming handlers
to evade the check (e.g. endpoints that return a `StreamingResponse` without an
explicit return annotation, or future endpoints whose annotations can’t be
resolved at runtime). Consider identifying streaming routes via the FastAPI
route metadata (e.g. `APIRoute.response_class is StreamingResponse`) and/or
failing the test when type-hint inspection can’t be performed so the guardrail
can’t be bypassed unintentionally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Prevent Session leak from StreamingResponse API endpoints. [airflow]
ashb commented on code in PR #65162: URL: https://github.com/apache/airflow/pull/65162#discussion_r3074414334 ## airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py: ## @@ -453,7 +453,7 @@ def get_node_summaries() -> Iterable[dict[str, Any]]: ) def get_grid_ti_summaries_stream( dag_id: str, -session: SessionDep, +session: Annotated[Session, Depends(_get_session)], Review Comment: It's a bit "icky" to import `_get_session`, but since this is the only occurrence of it now I figured it was better to live with this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
