Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
pierrejeambrun merged PR #43881: URL: https://github.com/apache/airflow/pull/43881 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1841781578 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -74,6 +84,56 @@ def get_assets( ) +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
pierrejeambrun commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1841771388 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -74,6 +84,56 @@ def get_assets( ) +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), Review Comment: 401, and 403 have been added to the root router, all routes now inherit that and we don't need to specify it anymore. Just other errors such as 404 need to be specified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840921170 ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840920332 ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): +def test_should_respond_200(self, test_client, session): +self.create_assets() +self.create_assets_events() +self.create_dag_run() +self.create_asset_dag_run() +assets = session.query(AssetEvent).all() +assert len(assets) == 2 +response = test_client.get("/public/assets/events") +assert response.status_code == 200 +response_data = response.json() +assert response_data == { +"asset_events": [ +{ +"id": 1, +"asset_id": 1, +"asset_uri": "s3://bucket/key/1", +"extra": {"foo": "bar"}, +"source_task_id": "source_task_id", +"source_dag_id": "source_dag_id", +"source_run_id": "source_run_id_1", +"source_map_index": -1, +"created_dagruns": [ +{ +"run_id": "source_run_id_1", +"dag_id": "source_dag_id", +"logical_date": "2020-06-11T18:00:00Z", +"start_date": "2020-06-11T18:00:00Z", +"end_date": "2020-06-11T18:00:00Z", +"state": "success", +"data_interval_start": "2020-06-11T18:00:00Z", +"data_interval_end": "2020-06-11T18:00:00Z", +} +], +"timestamp": "2020-06-11T18:00:00Z", +}, +{ +"id": 2, +"asset_id": 2, +"asset_uri": "s3://bucket/key/2", +"extra": {"foo": "bar"}, +"source_task_id": "source_task_id", +"source_dag_id": "source_dag_id", +"source_run_id": "source_run_id_2", +"source_map_index": -1, +"created_dagruns": [ +{ +"run_id": "source_run_id_2", +"dag_id": "source_dag_id", +"logical_date": "2020-06-11T18:00:00Z", +"start_date": "2020-06-11T18:00:00Z", +"end_date": "2020-06-11T18:00:00Z", +"state": "success", +"data_interval_start": "2020-06-11T18:00:00Z", +"data_interval_end": "2020-06-11T18:00:00Z", +} +], +"timestamp": "2020-06-11T18:00:00Z", +}, +], +"total_entries": 2, +} + +@pytest.mark.parametrize( +"filter_type, filter_value, total_entries", +[ +("asset_id", "2", 1), +("source_dag_id", "source_dag_id", 2), +("source_task_id", "source_task_id", 2), +("source_run_id", "source_run_id_1", 1), +("source_map_index", "-1", 2), +], +) +@provide_session +def test_filtering(self, test_client, filter_type, filter_value, total_entries, session): Review Comment: added `test_limit_and_offset` test ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): +def test_should_respond_200(self, test_client, session): +self.create_assets() +self.create_assets_events() +self.create_dag_run() +self.create_asset_dag_run() +assets = session.query(AssetEvent).all() +assert len(assets) == 2 +response = test_client.get("/public/assets/events") +assert response.status_code == 200 +response_data = response.json() +assert response_data == { +"asset_events": [ +{ +"id": 1, +"asset_id": 1, +"asset_uri": "s3://bucket/key/1", +"extra": {"foo": "bar"}, +"source_task_id": "source_task_id", +"source_dag_id": "source_dag_id", +"source_run_id": "source_run_id_1", +"source_map_index": -1, +"created_dagruns": [ +
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840802901 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,52 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +[ +"source_task_id", +"source_dag_id", +"source_run_id", +"source_map_index", +"timestamp", +], +AssetEvent, +).dynamic_depends("timestamp") +), +], +asset_id: QueryAssetIdFilter, +source_dag_id: QuerySourceDagIdFilter, +source_task_id: QuerySourceTaskIdFilter, +source_run_id: QuerySourceRunIdFilter, +source_map_index: QuerySourceMapIndexFilter, +session: Annotated[Session, Depends(get_session)], +) -> AssetEventCollectionResponse: +"""Get asset events.""" +assets_event_select, total_entries = paginated_select( +select(AssetEvent), +filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index], +order_by=order_by, +offset=offset, +limit=limit, +session=session, +) + +assets_events = session.scalars(assets_event_select).all() Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840802410 ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + +run_id: str +dag_id: str +execution_date: datetime = Field(alias="logical_date") +start_date: datetime +end_date: datetime +state: str +data_interval_start: datetime +data_interval_end: datetime + + +class AssetEventResponse(BaseModel): +"""Asset event serializer for responses.""" + +id: int +asset_id: int +asset_uri: str +extra: dict | None = None +source_task_id: str | None = None +source_dag_id: str | None = None +source_run_id: str | None = None +source_map_index: int +created_dagruns: list[DagRunAssetReference] +timestamp: datetime + +@model_validator(mode="before") +def rename_uri_to_asset_uri(cls, values): +"""Rename 'uri' to 'asset_uri' during serialization to match legacy response.""" +if hasattr(values, "uri") and values.uri: +values.asset_uri = values.uri +return values Review Comment: Done ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,52 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840800540 ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + +run_id: str +dag_id: str +execution_date: datetime = Field(alias="logical_date") +start_date: datetime +end_date: datetime +state: str +data_interval_start: datetime +data_interval_end: datetime + + +class AssetEventResponse(BaseModel): +"""Asset event serializer for responses.""" + +id: int +asset_id: int +asset_uri: str +extra: dict | None = None +source_task_id: str | None = None +source_dag_id: str | None = None +source_run_id: str | None = None +source_map_index: int +created_dagruns: list[DagRunAssetReference] +timestamp: datetime + +@model_validator(mode="before") +def rename_uri_to_asset_uri(cls, values): +"""Rename 'uri' to 'asset_uri' during serialization to match legacy response.""" +if hasattr(values, "uri") and values.uri: +values.asset_uri = values.uri +return values + + +class AssetEventCollectionResponse(BaseModel): +"""Asset collection response.""" Review Comment: Updated doc string -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
pierrejeambrun commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840730250 ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" Review Comment: ```suggestion """DAGRun serializer for asset responses.""" ``` (We need to remove the ORM Sqlalchemy reference. This does not only serialize SQLAlchemy objects) ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): +def test_should_respond_200(self, test_client, session): +self.create_assets() +self.create_assets_events() +self.create_dag_run() +self.create_asset_dag_run() +assets = session.query(AssetEvent).all() +assert len(assets) == 2 +response = test_client.get("/public/assets/events") +assert response.status_code == 200 +response_data = response.json() +assert response_data == { +"asset_events": [ +{ +"id": 1, +"asset_id": 1, +"asset_uri": "s3://bucket/key/1", +"extra": {"foo": "bar"}, +"source_task_id": "source_task_id", +"source_dag_id": "source_dag_id", +"source_run_id": "source_run_id_1", +"source_map_index": -1, +"created_dagruns": [ +{ +"run_id": "source_run_id_1", +"dag_id": "source_dag_id", +"logical_date": "2020-06-11T18:00:00Z", +"start_date": "2020-06-11T18:00:00Z", +"end_date": "2020-06-11T18:00:00Z", +"state": "success", +"data_interval_start": "2020-06-11T18:00:00Z", +"data_interval_end": "2020-06-11T18:00:00Z", +} +], +"timestamp": "2020-06-11T18:00:00Z", +}, +{ +"id": 2, +"asset_id": 2, +"asset_uri": "s3://bucket/key/2", +"extra": {"foo": "bar"}, +"source_task_id": "source_task_id", +"source_dag_id": "source_dag_id", +"source_run_id": "source_run_id_2", +"source_map_index": -1, +"created_dagruns": [ +{ +"run_id": "source_run_id_2", +"dag_id": "source_dag_id", +"logical_date": "2020-06-11T18:00:00Z", +"start_date": "2020-06-11T18:00:00Z", +"end_date": "2020-06-11T18:00:00Z", +"state": "success", +"data_interval_start": "2020-06-11T18:00:00Z", +"data_interval_end": "2020-06-11T18:00:00Z", +} +], +"timestamp": "2020-06-11T18:00:00Z", +}, +], +"total_entries": 2, +} + +@pytest.mark.parametrize( +"filter_type, filter_value, total_entries", +[ +("asset_id", "2", 1), +("source_dag_id", "source_dag_id", 2), +("source_task_id", "source_task_id", 2), +("source_run_id", "source_run_id_1", 1), +("source_map_index", "-1", 2), +], +) +@provide_session +def test_filtering(self, test_client, filter_type, filter_value, total_entries, session): +self.create_assets() +self.create_assets_events() +self.create_dag_run() +self.create_asset_dag_run() +response = test_client.get(f"/public/assets/events?{filter_type}={filter_value}") Review Comment: Don't format the URL manually, use `params={}` with a dict of paramater and leave that to the test_client. (Some values need encoding, etc...) ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): +def test_should_respond_200(self, test_client, session): +self.create_asset
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
pierrejeambrun commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840683048 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,52 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +[ +"source_task_id", +"source_dag_id", +"source_run_id", +"source_map_index", +"timestamp", +], +AssetEvent, +).dynamic_depends("timestamp") +), +], +asset_id: QueryAssetIdFilter, +source_dag_id: QuerySourceDagIdFilter, +source_task_id: QuerySourceTaskIdFilter, +source_run_id: QuerySourceRunIdFilter, +source_map_index: QuerySourceMapIndexFilter, +session: Annotated[Session, Depends(get_session)], +) -> AssetEventCollectionResponse: +"""Get asset events.""" +assets_event_select, total_entries = paginated_select( +select(AssetEvent), +filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index], +order_by=order_by, +offset=offset, +limit=limit, +session=session, +) + +assets_events = session.scalars(assets_event_select).all() Review Comment: Original loading `options` for relationship have been removed. This is important to keep as it will make the serialization faster and prevent multiple db queries to be emitted lazily. You need to add back the `query.options(subqueryload(AssetEvent.created_dagruns))` ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,52 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( Review Comment: The definition of routes have changed recently in https://github.com/apache/airflow/pull/43797 Route should be decalred `sync` ```suggestion def get_asset_events( ``` ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + +run_id: str +dag_id: str +execution_date: datetime = Field(alias="logical_date") +start_date: datetime +end_date: datetime +state: str +data_interval_start: datetime +data_interval_end: datetime + + +class AssetEventResponse(BaseModel): +"""Asset event serializer for responses.""" + +id: int +asset_id: int +asset_uri: str +extra: dict | None = None +source_task_id: str | None = None +source_dag_id: str | None = None +source_run_id: str | None = None +source_map_index: int +created_dagruns: list[DagRunAssetReference] +timestamp: datetime + +@model_validator(mode="before") +def rename_uri_to_asset_uri(cls, values): +"""Rename 'uri' to 'asset_uri' during serialization to match legacy response.""" +if hasattr(values, "uri") and values.uri: +values.asset_uri = values.uri +return values Review Comment: We can remove this, and simply have `uri` in the response. That's already a breaking change going from `dataset_uri` to `asset_uri`, we might as well take the opportunity to make it just `uri` ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + +run_id: str +dag_id: str +execution_date: datetime = Field(alias="logical_date") +start_date: datetime +end_date: datetime +state: str +data_interval_start: datetime +data_interval_end: datetime + + +class AssetEventResponse(BaseModel): +"""Asset event serializer for responses.""" + +id: int +asset_id: int +asset_uri: str +extra: dict | None = None +source_task_id: str | None = None +source_dag_id: str | None = None +source_run_id: str | None = None +source_map_index: int +created_dagruns: list[DagRunAssetReference] +timestamp: datetime + +@model_validator(mode="b
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
amoghrajesh commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840351144 ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): Review Comment: Yes it looks fine now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840306996 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,46 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +["timestamp", "source_dag_id", "source_task_id", "source_run_id", "source_map_index"], +AssetEvent, +).dynamic_depends("timestamp") Review Comment: Resolving this conversation as legacy is behaving in same way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840306325 ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): Review Comment: can we resolve this conversation as per our discussion @amoghrajesh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840175375 ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): Review Comment: @amoghrajesh here we are just verifying order by raises 400 for any invalid attr provided. Do you want me to cover any specific case here? Also, I see in legacy [test](https://github.com/astronomer/airflow/blob/dc4def7c87441554ff106e65ec2b7894cdba7b0e/tests/api_connexion/endpoints/test_asset_endpoint.py#L180) we are only validating same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
amoghrajesh commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1840121304 ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetsEvents(TestAssets): Review Comment: Ok great makes sense. ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,46 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +["timestamp", "source_dag_id", "source_task_id", "source_run_id", "source_map_index"], +AssetEvent, +).dynamic_depends("timestamp") Review Comment: Is that how the legacy behaves? If that's the behaviour of legacy, lets keep it that way ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetEvents(TestAssets): Review Comment: Let us cover the cases for: `test_order_by_raises_400_for_invalid_attr` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1839862686 ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetsEvents(TestAssets): Review Comment: #1 renamed to `test_filtering` #2 I am already verifying the created dagrun [here](https://github.com/astronomer/airflow/blob/674aff9f5643a31779bfd2584d1140b23772f1a4/tests/api_fastapi/core_api/routes/public/test_assets.py#L309) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1839843890 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,46 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +["timestamp", "source_dag_id", "source_task_id", "source_run_id", "source_map_index"], +AssetEvent, +).dynamic_depends("timestamp") Review Comment: I saw [this](https://github.com/apache/airflow/blob/main/airflow/api_connexion/endpoints/asset_endpoint.py#L124) in legacy code due to which I made this default. Also, noticed when I remove default from timestamp its considering `id` as default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
amoghrajesh commented on code in PR #43881: URL: https://github.com/apache/airflow/pull/43881#discussion_r1839620372 ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,46 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +["timestamp", "source_dag_id", "source_task_id", "source_run_id", "source_map_index"], +AssetEvent, +).dynamic_depends("timestamp") Review Comment: I think we should remove "timestamp" here as there is no default set in the docs: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dataset_events ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetsEvents(TestAssets): Review Comment: ```suggestion class TestGetAssetEvents(TestAssets): ``` ## airflow/api_fastapi/core_api/routes/public/assets.py: ## @@ -69,3 +79,46 @@ def get_assets( assets=[AssetResponse.model_validate(asset, from_attributes=True) for asset in assets], total_entries=total_entries, ) + + +@assets_router.get( +"/events", +responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_asset_events( +limit: QueryLimit, +offset: QueryOffset, +order_by: Annotated[ +SortParam, +Depends( +SortParam( +["timestamp", "source_dag_id", "source_task_id", "source_run_id", "source_map_index"], +AssetEvent, +).dynamic_depends("timestamp") +), +], +asset_id: QueryAssetIdFilter, +source_dag_id: QuerySourceDagIdFilter, +source_task_id: QuerySourceTaskIdFilter, +source_run_id: QuerySourceRunIdFilter, +source_map_index: QuerySourceMapIndexFilter, +session: Annotated[Session, Depends(get_session)], +) -> AssetEventCollectionResponse: +"""Get assets events.""" Review Comment: ```suggestion """Get asset events.""" ``` ## airflow/api_fastapi/core_api/datamodels/assets.py: ## @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel): assets: list[AssetResponse] total_entries: int + + +class DagRunAssetReference(BaseModel): +"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel.""" + +run_id: str +dag_id: str +execution_date: datetime = Field(alias="logical_date") +start_date: datetime +end_date: datetime +state: str +data_interval_start: datetime +data_interval_end: datetime + + +class AssetEventResponse(BaseModel): +"""Asset event serializer for responses.""" + +id: int +asset_id: int +asset_uri: str +extra: dict | None = None +source_task_id: str | None = None +source_dag_id: str | None = None +source_run_id: str | None = None +source_map_index: int +created_dagruns: list[DagRunAssetReference] +timestamp: datetime + +@model_validator(mode="before") +def rename_uri_to_asset_uri(cls, values): +"""Rename 'uri' to 'asset_uri' during serialization.""" Review Comment: ```suggestion """Rename 'uri' to 'asset_uri' during serialization to match legacy response.""" ``` ## airflow/api_fastapi/core_api/routes/public/__init__.py: ## @@ -58,5 +58,6 @@ public_router.include_router(variables_router) public_router.include_router(version_router) public_router.include_router(dag_stats_router) +public_router.include_router(assets_router) Review Comment: This one is added on line 63 also. Remove it from here ## tests/api_fastapi/core_api/routes/public/test_assets.py: ## @@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, test_client): assert response.status_code == 200 assert len(response.json()["assets"]) == 100 + + +class TestGetAssetsEvents(TestAssets): Review Comment: Comparing with test_asset_endpoint.py from legacy API, can we try to maintain similar functions? I think these are missing/renamed: 1. `test_filtering` -> has general filtering compared to `test_filter_events_by_asset_id` 2. `test_includes_created_dagrun` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@a
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on PR #43881: URL: https://github.com/apache/airflow/pull/43881#issuecomment-2471364192 > Can you rebase the branch and resolve conflicts. The base branch has been merged as well, it should simplify the review, thanks. @pierrejeambrun resolved conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
pierrejeambrun commented on PR #43881: URL: https://github.com/apache/airflow/pull/43881#issuecomment-2471079934 Can you rebase the branch and resolve conflicts. The base branch has been merged as well, it should simplify the review, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]
vatsrahul1001 commented on PR #43881: URL: https://github.com/apache/airflow/pull/43881#issuecomment-2469667672 Note: Kindly review after [commit](https://github.com/apache/airflow/pull/43881/commits/8b6b09ea58b384f96a6cdd07dd6a1ad43fd41856) had to build from [PR](https://github.com/apache/airflow/pull/43783) as its still not merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org