Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-14 Thread via GitHub


pierrejeambrun merged PR #43881:
URL: https://github.com/apache/airflow/pull/43881


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-14 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1841781578


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -74,6 +84,56 @@ def get_assets(
 )
 
 
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-14 Thread via GitHub


pierrejeambrun commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1841771388


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -74,6 +84,56 @@ def get_assets(
 )
 
 
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),

Review Comment:
   401, and 403 have been added to the root router, all routes now inherit that 
and we don't need to specify it anymore. Just other errors such as 404 need to 
be specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840921170


##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840920332


##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):
+def test_should_respond_200(self, test_client, session):
+self.create_assets()
+self.create_assets_events()
+self.create_dag_run()
+self.create_asset_dag_run()
+assets = session.query(AssetEvent).all()
+assert len(assets) == 2
+response = test_client.get("/public/assets/events")
+assert response.status_code == 200
+response_data = response.json()
+assert response_data == {
+"asset_events": [
+{
+"id": 1,
+"asset_id": 1,
+"asset_uri": "s3://bucket/key/1",
+"extra": {"foo": "bar"},
+"source_task_id": "source_task_id",
+"source_dag_id": "source_dag_id",
+"source_run_id": "source_run_id_1",
+"source_map_index": -1,
+"created_dagruns": [
+{
+"run_id": "source_run_id_1",
+"dag_id": "source_dag_id",
+"logical_date": "2020-06-11T18:00:00Z",
+"start_date": "2020-06-11T18:00:00Z",
+"end_date": "2020-06-11T18:00:00Z",
+"state": "success",
+"data_interval_start": "2020-06-11T18:00:00Z",
+"data_interval_end": "2020-06-11T18:00:00Z",
+}
+],
+"timestamp": "2020-06-11T18:00:00Z",
+},
+{
+"id": 2,
+"asset_id": 2,
+"asset_uri": "s3://bucket/key/2",
+"extra": {"foo": "bar"},
+"source_task_id": "source_task_id",
+"source_dag_id": "source_dag_id",
+"source_run_id": "source_run_id_2",
+"source_map_index": -1,
+"created_dagruns": [
+{
+"run_id": "source_run_id_2",
+"dag_id": "source_dag_id",
+"logical_date": "2020-06-11T18:00:00Z",
+"start_date": "2020-06-11T18:00:00Z",
+"end_date": "2020-06-11T18:00:00Z",
+"state": "success",
+"data_interval_start": "2020-06-11T18:00:00Z",
+"data_interval_end": "2020-06-11T18:00:00Z",
+}
+],
+"timestamp": "2020-06-11T18:00:00Z",
+},
+],
+"total_entries": 2,
+}
+
+@pytest.mark.parametrize(
+"filter_type, filter_value, total_entries",
+[
+("asset_id", "2", 1),
+("source_dag_id", "source_dag_id", 2),
+("source_task_id", "source_task_id", 2),
+("source_run_id", "source_run_id_1", 1),
+("source_map_index", "-1", 2),
+],
+)
+@provide_session
+def test_filtering(self, test_client, filter_type, filter_value, 
total_entries, session):

Review Comment:
   added `test_limit_and_offset` test



##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):
+def test_should_respond_200(self, test_client, session):
+self.create_assets()
+self.create_assets_events()
+self.create_dag_run()
+self.create_asset_dag_run()
+assets = session.query(AssetEvent).all()
+assert len(assets) == 2
+response = test_client.get("/public/assets/events")
+assert response.status_code == 200
+response_data = response.json()
+assert response_data == {
+"asset_events": [
+{
+"id": 1,
+"asset_id": 1,
+"asset_uri": "s3://bucket/key/1",
+"extra": {"foo": "bar"},
+"source_task_id": "source_task_id",
+"source_dag_id": "source_dag_id",
+"source_run_id": "source_run_id_1",
+"source_map_index": -1,
+"created_dagruns": [
+  

Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840802901


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,52 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+[
+"source_task_id",
+"source_dag_id",
+"source_run_id",
+"source_map_index",
+"timestamp",
+],
+AssetEvent,
+).dynamic_depends("timestamp")
+),
+],
+asset_id: QueryAssetIdFilter,
+source_dag_id: QuerySourceDagIdFilter,
+source_task_id: QuerySourceTaskIdFilter,
+source_run_id: QuerySourceRunIdFilter,
+source_map_index: QuerySourceMapIndexFilter,
+session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+"""Get asset events."""
+assets_event_select, total_entries = paginated_select(
+select(AssetEvent),
+filters=[asset_id, source_dag_id, source_task_id, source_run_id, 
source_map_index],
+order_by=order_by,
+offset=offset,
+limit=limit,
+session=session,
+)
+
+assets_events = session.scalars(assets_event_select).all()

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840802410


##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+run_id: str
+dag_id: str
+execution_date: datetime = Field(alias="logical_date")
+start_date: datetime
+end_date: datetime
+state: str
+data_interval_start: datetime
+data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+"""Asset event serializer for responses."""
+
+id: int
+asset_id: int
+asset_uri: str
+extra: dict | None = None
+source_task_id: str | None = None
+source_dag_id: str | None = None
+source_run_id: str | None = None
+source_map_index: int
+created_dagruns: list[DagRunAssetReference]
+timestamp: datetime
+
+@model_validator(mode="before")
+def rename_uri_to_asset_uri(cls, values):
+"""Rename 'uri' to 'asset_uri' during serialization to match legacy 
response."""
+if hasattr(values, "uri") and values.uri:
+values.asset_uri = values.uri
+return values

Review Comment:
   Done



##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,52 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840800540


##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+run_id: str
+dag_id: str
+execution_date: datetime = Field(alias="logical_date")
+start_date: datetime
+end_date: datetime
+state: str
+data_interval_start: datetime
+data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+"""Asset event serializer for responses."""
+
+id: int
+asset_id: int
+asset_uri: str
+extra: dict | None = None
+source_task_id: str | None = None
+source_dag_id: str | None = None
+source_run_id: str | None = None
+source_map_index: int
+created_dagruns: list[DagRunAssetReference]
+timestamp: datetime
+
+@model_validator(mode="before")
+def rename_uri_to_asset_uri(cls, values):
+"""Rename 'uri' to 'asset_uri' during serialization to match legacy 
response."""
+if hasattr(values, "uri") and values.uri:
+values.asset_uri = values.uri
+return values
+
+
+class AssetEventCollectionResponse(BaseModel):
+"""Asset collection response."""

Review Comment:
   Updated doc string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


pierrejeambrun commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840730250


##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""

Review Comment:
   ```suggestion
   """DAGRun serializer for asset responses."""
   ```
   
   (We need to remove the ORM Sqlalchemy reference. This does not only 
serialize SQLAlchemy objects)



##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):
+def test_should_respond_200(self, test_client, session):
+self.create_assets()
+self.create_assets_events()
+self.create_dag_run()
+self.create_asset_dag_run()
+assets = session.query(AssetEvent).all()
+assert len(assets) == 2
+response = test_client.get("/public/assets/events")
+assert response.status_code == 200
+response_data = response.json()
+assert response_data == {
+"asset_events": [
+{
+"id": 1,
+"asset_id": 1,
+"asset_uri": "s3://bucket/key/1",
+"extra": {"foo": "bar"},
+"source_task_id": "source_task_id",
+"source_dag_id": "source_dag_id",
+"source_run_id": "source_run_id_1",
+"source_map_index": -1,
+"created_dagruns": [
+{
+"run_id": "source_run_id_1",
+"dag_id": "source_dag_id",
+"logical_date": "2020-06-11T18:00:00Z",
+"start_date": "2020-06-11T18:00:00Z",
+"end_date": "2020-06-11T18:00:00Z",
+"state": "success",
+"data_interval_start": "2020-06-11T18:00:00Z",
+"data_interval_end": "2020-06-11T18:00:00Z",
+}
+],
+"timestamp": "2020-06-11T18:00:00Z",
+},
+{
+"id": 2,
+"asset_id": 2,
+"asset_uri": "s3://bucket/key/2",
+"extra": {"foo": "bar"},
+"source_task_id": "source_task_id",
+"source_dag_id": "source_dag_id",
+"source_run_id": "source_run_id_2",
+"source_map_index": -1,
+"created_dagruns": [
+{
+"run_id": "source_run_id_2",
+"dag_id": "source_dag_id",
+"logical_date": "2020-06-11T18:00:00Z",
+"start_date": "2020-06-11T18:00:00Z",
+"end_date": "2020-06-11T18:00:00Z",
+"state": "success",
+"data_interval_start": "2020-06-11T18:00:00Z",
+"data_interval_end": "2020-06-11T18:00:00Z",
+}
+],
+"timestamp": "2020-06-11T18:00:00Z",
+},
+],
+"total_entries": 2,
+}
+
+@pytest.mark.parametrize(
+"filter_type, filter_value, total_entries",
+[
+("asset_id", "2", 1),
+("source_dag_id", "source_dag_id", 2),
+("source_task_id", "source_task_id", 2),
+("source_run_id", "source_run_id_1", 1),
+("source_map_index", "-1", 2),
+],
+)
+@provide_session
+def test_filtering(self, test_client, filter_type, filter_value, 
total_entries, session):
+self.create_assets()
+self.create_assets_events()
+self.create_dag_run()
+self.create_asset_dag_run()
+response = 
test_client.get(f"/public/assets/events?{filter_type}={filter_value}")

Review Comment:
   Don't format the URL manually, use `params={}` with a dict of paramater and 
leave that to the test_client. (Some values need encoding, etc...)



##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):
+def test_should_respond_200(self, test_client, session):
+self.create_asset

Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


pierrejeambrun commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840683048


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,52 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+[
+"source_task_id",
+"source_dag_id",
+"source_run_id",
+"source_map_index",
+"timestamp",
+],
+AssetEvent,
+).dynamic_depends("timestamp")
+),
+],
+asset_id: QueryAssetIdFilter,
+source_dag_id: QuerySourceDagIdFilter,
+source_task_id: QuerySourceTaskIdFilter,
+source_run_id: QuerySourceRunIdFilter,
+source_map_index: QuerySourceMapIndexFilter,
+session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+"""Get asset events."""
+assets_event_select, total_entries = paginated_select(
+select(AssetEvent),
+filters=[asset_id, source_dag_id, source_task_id, source_run_id, 
source_map_index],
+order_by=order_by,
+offset=offset,
+limit=limit,
+session=session,
+)
+
+assets_events = session.scalars(assets_event_select).all()

Review Comment:
   Original loading `options` for relationship have been removed. This is 
important to keep as it will make the serialization faster and prevent multiple 
db queries to be emitted lazily.
   
   You need to add back the 
`query.options(subqueryload(AssetEvent.created_dagruns))`



##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,52 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(

Review Comment:
   The definition of routes have changed recently in 
https://github.com/apache/airflow/pull/43797
   
   Route should be decalred `sync` 
   ```suggestion
   def get_asset_events(
   ```



##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+run_id: str
+dag_id: str
+execution_date: datetime = Field(alias="logical_date")
+start_date: datetime
+end_date: datetime
+state: str
+data_interval_start: datetime
+data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+"""Asset event serializer for responses."""
+
+id: int
+asset_id: int
+asset_uri: str
+extra: dict | None = None
+source_task_id: str | None = None
+source_dag_id: str | None = None
+source_run_id: str | None = None
+source_map_index: int
+created_dagruns: list[DagRunAssetReference]
+timestamp: datetime
+
+@model_validator(mode="before")
+def rename_uri_to_asset_uri(cls, values):
+"""Rename 'uri' to 'asset_uri' during serialization to match legacy 
response."""
+if hasattr(values, "uri") and values.uri:
+values.asset_uri = values.uri
+return values

Review Comment:
   We can remove this, and simply have `uri` in the response. That's already a 
breaking change going from `dataset_uri` to `asset_uri`, we might as well take 
the opportunity to make it just `uri`



##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+run_id: str
+dag_id: str
+execution_date: datetime = Field(alias="logical_date")
+start_date: datetime
+end_date: datetime
+state: str
+data_interval_start: datetime
+data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+"""Asset event serializer for responses."""
+
+id: int
+asset_id: int
+asset_uri: str
+extra: dict | None = None
+source_task_id: str | None = None
+source_dag_id: str | None = None
+source_run_id: str | None = None
+source_map_index: int
+created_dagruns: list[DagRunAssetReference]
+timestamp: datetime
+
+@model_validator(mode="b

Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


amoghrajesh commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840351144


##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):

Review Comment:
   Yes it looks fine now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840306996


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,46 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["timestamp", "source_dag_id", "source_task_id", 
"source_run_id", "source_map_index"],
+AssetEvent,
+).dynamic_depends("timestamp")

Review Comment:
   Resolving this conversation as legacy is behaving in same way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840306325


##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):

Review Comment:
   can we resolve this conversation as per our discussion @amoghrajesh 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840175375


##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):

Review Comment:
   @amoghrajesh here we are just verifying order by raises 400 for any invalid 
attr provided. Do you want me to cover any specific case here? Also,  I see in 
legacy 
[test](https://github.com/astronomer/airflow/blob/dc4def7c87441554ff106e65ec2b7894cdba7b0e/tests/api_connexion/endpoints/test_asset_endpoint.py#L180)
 we are only validating same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


amoghrajesh commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840121304


##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetsEvents(TestAssets):

Review Comment:
   Ok great makes sense. 



##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,46 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["timestamp", "source_dag_id", "source_task_id", 
"source_run_id", "source_map_index"],
+AssetEvent,
+).dynamic_depends("timestamp")

Review Comment:
   Is that how the legacy behaves? If that's the behaviour of legacy, lets keep 
it that way



##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEvents(TestAssets):

Review Comment:
   Let us cover the cases for:
   `test_order_by_raises_400_for_invalid_attr`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1839862686


##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetsEvents(TestAssets):

Review Comment:
   #1 renamed to `test_filtering`
   #2 I am already verifying the created dagrun 
[here](https://github.com/astronomer/airflow/blob/674aff9f5643a31779bfd2584d1140b23772f1a4/tests/api_fastapi/core_api/routes/public/test_assets.py#L309)
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-13 Thread via GitHub


vatsrahul1001 commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1839843890


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,46 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["timestamp", "source_dag_id", "source_task_id", 
"source_run_id", "source_map_index"],
+AssetEvent,
+).dynamic_depends("timestamp")

Review Comment:
   I saw 
[this](https://github.com/apache/airflow/blob/main/airflow/api_connexion/endpoints/asset_endpoint.py#L124)
 in legacy code due to which I made this default. Also, noticed when I remove 
default from timestamp its considering `id` as default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-12 Thread via GitHub


amoghrajesh commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1839620372


##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,46 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["timestamp", "source_dag_id", "source_task_id", 
"source_run_id", "source_map_index"],
+AssetEvent,
+).dynamic_depends("timestamp")

Review Comment:
   I think we should remove "timestamp" here as there is no default set in the 
docs: 
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dataset_events



##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetsEvents(TestAssets):

Review Comment:
   ```suggestion
   class TestGetAssetEvents(TestAssets):
   ```



##
airflow/api_fastapi/core_api/routes/public/assets.py:
##
@@ -69,3 +79,46 @@ def get_assets(
 assets=[AssetResponse.model_validate(asset, from_attributes=True) for 
asset in assets],
 total_entries=total_entries,
 )
+
+
+@assets_router.get(
+"/events",
+responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+limit: QueryLimit,
+offset: QueryOffset,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["timestamp", "source_dag_id", "source_task_id", 
"source_run_id", "source_map_index"],
+AssetEvent,
+).dynamic_depends("timestamp")
+),
+],
+asset_id: QueryAssetIdFilter,
+source_dag_id: QuerySourceDagIdFilter,
+source_task_id: QuerySourceTaskIdFilter,
+source_run_id: QuerySourceRunIdFilter,
+source_map_index: QuerySourceMapIndexFilter,
+session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+"""Get assets events."""

Review Comment:
   ```suggestion
   """Get asset events."""
   ```



##
airflow/api_fastapi/core_api/datamodels/assets.py:
##
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
 
 assets: list[AssetResponse]
 total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+run_id: str
+dag_id: str
+execution_date: datetime = Field(alias="logical_date")
+start_date: datetime
+end_date: datetime
+state: str
+data_interval_start: datetime
+data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+"""Asset event serializer for responses."""
+
+id: int
+asset_id: int
+asset_uri: str
+extra: dict | None = None
+source_task_id: str | None = None
+source_dag_id: str | None = None
+source_run_id: str | None = None
+source_map_index: int
+created_dagruns: list[DagRunAssetReference]
+timestamp: datetime
+
+@model_validator(mode="before")
+def rename_uri_to_asset_uri(cls, values):
+"""Rename 'uri' to 'asset_uri' during serialization."""

Review Comment:
   ```suggestion
   """Rename 'uri' to 'asset_uri' during serialization to match legacy 
response."""
   ```



##
airflow/api_fastapi/core_api/routes/public/__init__.py:
##
@@ -58,5 +58,6 @@
 public_router.include_router(variables_router)
 public_router.include_router(version_router)
 public_router.include_router(dag_stats_router)
+public_router.include_router(assets_router)

Review Comment:
   This one is added on line 63 also. Remove it from here



##
tests/api_fastapi/core_api/routes/public/test_assets.py:
##
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self, 
test_client):
 
 assert response.status_code == 200
 assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetsEvents(TestAssets):

Review Comment:
   Comparing with test_asset_endpoint.py from legacy API, can we try to 
maintain similar functions? 
   
   I think these are missing/renamed:
   1. `test_filtering` -> has general filtering compared to 
`test_filter_events_by_asset_id`
   2. `test_includes_created_dagrun`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@a

Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-12 Thread via GitHub


vatsrahul1001 commented on PR #43881:
URL: https://github.com/apache/airflow/pull/43881#issuecomment-2471364192

   > Can you rebase the branch and resolve conflicts. The base branch has been 
merged as well, it should simplify the review, thanks.
   
   @pierrejeambrun resolved conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-12 Thread via GitHub


pierrejeambrun commented on PR #43881:
URL: https://github.com/apache/airflow/pull/43881#issuecomment-2471079934

   Can you rebase the branch and resolve conflicts. The base branch has been 
merged as well, it should simplify the review, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP 84: Migrate GET ASSET EVENTS legacy API to fast API [airflow]

2024-11-11 Thread via GitHub


vatsrahul1001 commented on PR #43881:
URL: https://github.com/apache/airflow/pull/43881#issuecomment-2469667672

   Note: Kindly review after 
[commit](https://github.com/apache/airflow/pull/43881/commits/8b6b09ea58b384f96a6cdd07dd6a1ad43fd41856)
 had to build from  [PR](https://github.com/apache/airflow/pull/43783) as its 
still not merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org