Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
potiuk merged PR #45634: URL: https://github.com/apache/airflow/pull/45634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
boring-cyborg[bot] commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2634399656 Awesome work, congrats on your first merged pull request! You are invited to check our [Issue Tracker](https://github.com/apache/airflow/issues) for additional contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2633570050 > Looks good to me. Thanks for your prompt reply. I'll keep it open for one to two days so others can take a look. [Fixed one incorrect "include" path in docs](https://github.com/apache/airflow/pull/45634/commits/670aa4f3bccc34be6f956c54bf1d08a067973842). Checks should be all green now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2633288207 @Lee-W rebasing is done, please have a look again. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
Lee-W commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2630237786 > @josh-fell @Lee-W thank you for the code review, I've addressed all feedback. Could you please look again? > > BTW. What do you think regarding @jaklan suggestion ([#45634 (review)](https://github.com/apache/airflow/pull/45634#pullrequestreview-2557110660))? My thoughts: > > 1. this (implicit argument types) feels wrong/confusing: > > > ```python > self.job: str | int, > self.project: str | int, > self.environment: str | int, > ``` > > I think the params should be explicit, so I would go with: > > ```python > self.job_id: int, > self.project_id: int, > self.environment_id: int, > self.job_name: str, > self.project_name: str, > self.environment_name: str, > ``` > > 2. I see this as an enhancement that could allow more flexible setups, but I would prefer to release this PR "as is" and unblock using `project_name`, `environment_name`, and `job_name` ASAP. I would create a new PR with the suggested enhancements. Hi, sorry for the late reply. I prefer this idea more. It would be great if you could rebase from the main branch and resolve the conflict, then I'll review it again. Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2619507526 > @josh-fell @Lee-W thank you for the code review, I've addressed all feedback. Could you please look again? > > BTW. What do you think regarding @jaklan suggestion ([#45634 (review)](https://github.com/apache/airflow/pull/45634#pullrequestreview-2557110660))? My thoughts: > > 1. this (implicit argument types) feels wrong/confusing: > > ```python > self.job: str | int, > self.project: str | int, > self.environment: str | int, > ``` > > I think the params should be explicit, so I would go with: > > ```python > self.job_id: int, > self.project_id: int, > self.environment_id: int, > self.job_name: str, > self.project_name: str, > self.environment_name: str, > ``` > > 2. I see this as an enhancement that could allow more flexible setups, but I would prefer to release this PR "as is" and unblock using `project_name`, `environment_name`, and `job_name` ASAP. I would create a new PR with the suggested enhancements. New PR that includes the changes described above: https://github.com/apache/airflow/pull/46184 Feel free to select just one of my PRs and close the other one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2615279528 @josh-fell @Lee-W thank you for the code review, I've addressed all feedback. Could you please look again? BTW. What do you think regarding @jaklan suggestion (https://github.com/apache/airflow/pull/45634#pullrequestreview-2557110660)? My thoughts: 1. this (implicit argument types) feels wrong/confusing: ```py self.job: str | int, self.project: str | int, self.environment: str | int, ``` I think the params should be explicit, so I would go with: ```py self.job_id: int, self.project_id: int, self.environment_id: int, self.job_name: str, self.project_name: str, self.environment_name: str, ``` 2. I see this as an enhancement that could allow more flexible setups, but I would prefer to release this PR "as is" and unblock using `project_name`, `environment_name`, and `job_name` ASAP. I would create a new PR with the suggested enhancements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1930165305 ## providers/src/airflow/providers/dbt/cloud/operators/dbt.py: ## @@ -135,6 +148,18 @@ def execute(self, context: Context): f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG." ) +if self.job_id is None: +if not all([self.project_name, self.environment_name, self.job_name]): +raise AirflowException( Review Comment: I'll use built-in `ValueError` here since it is suitable for this use case. I believe this is in line with the discussion in slack (https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1737614696625489) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1930145227 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") +@fallback_to_default_account +def list_environments( +self, project_id: int, name_contains: str | None = None, account_id: int | None = None +) -> list[Response]: +""" +Retrieve metadata for all environments tied to a specified dbt Cloud project. + +:param project_id: The ID of a dbt Cloud project. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: List of request responses. +""" +payload = {"name__icontains": name_contains} if name_contains else None +return self._run_and_get_response( +endpoint=f"{account_id}/projects/{project_id}/environments/", +payload=payload, +paginate=True, +api_version="v3", +) + +@fallback_to_default_account +def get_environment( +self, project_id: int, environment_id: int, account_id: int | None = None +) -> Response: +""" +Retrieve metadata for a specific project's environment. + +:param project_id: The ID of a dbt Cloud project. +:param environment_id: The ID of a dbt Cloud environment. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The request response. +""" +return self._run_and_get_response( + endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3" +) + @fallback_to_default_account def list_jobs( self, account_id: int | None = None, order_by: str | None = None, project_id: int | None = None, +environment_id: int | None = None, +name_contains: str | None = None, ) -> list[Response]: """ Retrieve metadata for all jobs tied to a specified dbt Cloud account. If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved. +If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved. :param account_id: Optional. The ID of a dbt Cloud account. :param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order. For example, to use reverse order by the run ID use ``order_by=-id``. -:param project_id: The ID of a dbt Cloud project. +:param project_id: Optional. The ID of a dbt Cloud project. +:param environment_id: Optional. The ID of a dbt Cloud environment. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by. Review Comment: In dbt cloud API, this argument is `name__icontains` (two underscores) which (IMO) did not feel like a good fit for Airflow naming convention style, that's why I changed it to simpler `name_contains` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
joellabes commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2609019669 >could you have a look if you are fine with the approach? I'm sorry that it's necessary (and have linked this internally for us to work on a better strategy!) but I think it's a sensible workaround for now, and is a pattern we've seen customers use before 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
Lee-W commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1926250885 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") +@fallback_to_default_account +def list_environments( +self, project_id: int, name_contains: str | None = None, account_id: int | None = None +) -> list[Response]: +""" +Retrieve metadata for all environments tied to a specified dbt Cloud project. + +:param project_id: The ID of a dbt Cloud project. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: List of request responses. +""" +payload = {"name__icontains": name_contains} if name_contains else None +return self._run_and_get_response( +endpoint=f"{account_id}/projects/{project_id}/environments/", +payload=payload, +paginate=True, +api_version="v3", +) + +@fallback_to_default_account +def get_environment( +self, project_id: int, environment_id: int, account_id: int | None = None +) -> Response: Review Comment: sounds like a good. in general, i would suggest using keyword-only for public method with more than 3 args. it's hard to remember the right order -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
Lee-W commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1926250127 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") +@fallback_to_default_account +def list_environments( +self, project_id: int, name_contains: str | None = None, account_id: int | None = None +) -> list[Response]: +""" +Retrieve metadata for all environments tied to a specified dbt Cloud project. + +:param project_id: The ID of a dbt Cloud project. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: List of request responses. +""" +payload = {"name__icontains": name_contains} if name_contains else None +return self._run_and_get_response( +endpoint=f"{account_id}/projects/{project_id}/environments/", +payload=payload, +paginate=True, +api_version="v3", +) + +@fallback_to_default_account +def get_environment( +self, project_id: int, environment_id: int, account_id: int | None = None +) -> Response: +""" +Retrieve metadata for a specific project's environment. + +:param project_id: The ID of a dbt Cloud project. +:param environment_id: The ID of a dbt Cloud environment. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The request response. +""" +return self._run_and_get_response( + endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3" +) + @fallback_to_default_account def list_jobs( self, account_id: int | None = None, order_by: str | None = None, project_id: int | None = None, +environment_id: int | None = None, +name_contains: str | None = None, ) -> list[Response]: """ Retrieve metadata for all jobs tied to a specified dbt Cloud account. If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved. +If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved. :param account_id: Optional. The ID of a dbt Cloud account. :param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order. For example, to use reverse order by the run ID use ``order_by=-id``. -:param project_id: The ID of a dbt Cloud project. +:param project_id: Optional. The ID of a dbt Cloud project. +:param environment_id: Optional. The ID of a dbt Cloud environment. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by. Review Comment: yep, same here. no strong opinion on this one. Probably makes more sense for one more familiar with dbt API to decide. just would like to confirm 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
josh-fell commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1925488870 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") +@fallback_to_default_account +def list_environments( +self, project_id: int, name_contains: str | None = None, account_id: int | None = None +) -> list[Response]: +""" +Retrieve metadata for all environments tied to a specified dbt Cloud project. + +:param project_id: The ID of a dbt Cloud project. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: List of request responses. +""" +payload = {"name__icontains": name_contains} if name_contains else None +return self._run_and_get_response( +endpoint=f"{account_id}/projects/{project_id}/environments/", +payload=payload, +paginate=True, +api_version="v3", +) + +@fallback_to_default_account +def get_environment( +self, project_id: int, environment_id: int, account_id: int | None = None +) -> Response: +""" +Retrieve metadata for a specific project's environment. + +:param project_id: The ID of a dbt Cloud project. +:param environment_id: The ID of a dbt Cloud environment. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The request response. +""" +return self._run_and_get_response( + endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3" +) + @fallback_to_default_account def list_jobs( self, account_id: int | None = None, order_by: str | None = None, project_id: int | None = None, +environment_id: int | None = None, +name_contains: str | None = None, ) -> list[Response]: """ Retrieve metadata for all jobs tied to a specified dbt Cloud account. If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved. +If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved. :param account_id: Optional. The ID of a dbt Cloud account. :param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order. For example, to use reverse order by the run ID use ``order_by=-id``. -:param project_id: The ID of a dbt Cloud project. +:param project_id: Optional. The ID of a dbt Cloud project. +:param environment_id: Optional. The ID of a dbt Cloud environment. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by. Review Comment: No strong opinion here, but I don't mind keeping more semantic param names that don't quite match the API if the semantic param name is clearer. The docstring for this param is pretty clear too. Users shouldn't necessary be cognizant of the APIs behind the hook methods (if we make the APIs in the hooks and operators easy to use and understood from looking at the provider itself). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
josh-fell commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1925482067 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") +@fallback_to_default_account +def list_environments( +self, project_id: int, name_contains: str | None = None, account_id: int | None = None +) -> list[Response]: +""" +Retrieve metadata for all environments tied to a specified dbt Cloud project. + +:param project_id: The ID of a dbt Cloud project. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: List of request responses. +""" +payload = {"name__icontains": name_contains} if name_contains else None +return self._run_and_get_response( +endpoint=f"{account_id}/projects/{project_id}/environments/", +payload=payload, +paginate=True, +api_version="v3", +) + +@fallback_to_default_account +def get_environment( +self, project_id: int, environment_id: int, account_id: int | None = None +) -> Response: Review Comment: @Lee-W Were you thinking this method should also have `account_id` be a keyword-only arg as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
josh-fell commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1925471395 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -356,14 +356,23 @@ def get_account(self, account_id: int | None = None) -> Response: return self._run_and_get_response(endpoint=f"{account_id}/") @fallback_to_default_account -def list_projects(self, account_id: int | None = None) -> list[Response]: +def list_projects( +self, name_contains: str | None = None, account_id: int | None = None Review Comment: +1 I've seen folks get burned by this before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
Lee-W commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1925003561 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -356,14 +356,23 @@ def get_account(self, account_id: int | None = None) -> Response: return self._run_and_get_response(endpoint=f"{account_id}/") @fallback_to_default_account -def list_projects(self, account_id: int | None = None) -> list[Response]: +def list_projects( +self, name_contains: str | None = None, account_id: int | None = None Review Comment: ```suggestion self, account_id: int | None = None, name_contains: str | None = None ``` changing order might break existing code ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") +@fallback_to_default_account +def list_environments( +self, project_id: int, name_contains: str | None = None, account_id: int | None = None +) -> list[Response]: +""" +Retrieve metadata for all environments tied to a specified dbt Cloud project. + +:param project_id: The ID of a dbt Cloud project. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: List of request responses. +""" +payload = {"name__icontains": name_contains} if name_contains else None +return self._run_and_get_response( +endpoint=f"{account_id}/projects/{project_id}/environments/", +payload=payload, +paginate=True, +api_version="v3", +) + +@fallback_to_default_account +def get_environment( +self, project_id: int, environment_id: int, account_id: int | None = None +) -> Response: +""" +Retrieve metadata for a specific project's environment. + +:param project_id: The ID of a dbt Cloud project. +:param environment_id: The ID of a dbt Cloud environment. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The request response. +""" +return self._run_and_get_response( + endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3" +) + @fallback_to_default_account def list_jobs( self, account_id: int | None = None, order_by: str | None = None, project_id: int | None = None, +environment_id: int | None = None, +name_contains: str | None = None, ) -> list[Response]: """ Retrieve metadata for all jobs tied to a specified dbt Cloud account. If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved. +If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved. :param account_id: Optional. The ID of a dbt Cloud account. :param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order. For example, to use reverse order by the run ID use ``order_by=-id``. -:param project_id: The ID of a dbt Cloud project. +:param project_id: Optional. The ID of a dbt Cloud project. +:param environment_id: Optional. The ID of a dbt Cloud environment. +:param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by. Review Comment: Should we unify the name as `name_icontaines`? ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") +@fallback_to_default_account +def get_job_by_name( +self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None +) -> dict: +""" +Retrieve metadata for a specific job by combination of project, environment, and job name. + +Raises AirflowException if the job is not found or cannot be uniquely identified by provided parameters. + +:param project_name: The name of a dbt Cloud project. +:param environment_name: The name of a dbt Cloud environment. +:param job_name: The name of a dbt Cloud job. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The details of a job. +""" +# get project_id using project_name +list_projects_responses = self.list_projects(name_contains=project_name, account_id=account_id) +
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1919703513 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") +@fallback_to_default_account +def get_job_by_name( +self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None +) -> dict: +""" +Retrieve metadata for a specific job by combination of project, environment, and job name. + +Raises AirflowException if the job is not found or cannot be uniquely identified by provided parameters. + +:param project_name: The name of a dbt Cloud project. +:param environment_name: The name of a dbt Cloud environment. +:param job_name: The name of a dbt Cloud job. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The details of a job. +""" +# get project_id using project_name +projects = self.list_projects(name_contains=project_name, account_id=account_id) Review Comment: Thanks for catching that! I've improved the clarity by renaming variables and enhancing comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1919704470 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") +@fallback_to_default_account +def get_job_by_name( +self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None +) -> dict: +""" +Retrieve metadata for a specific job by combination of project, environment, and job name. + +Raises AirflowException if the job is not found or cannot be uniquely identified by provided parameters. + +:param project_name: The name of a dbt Cloud project. +:param environment_name: The name of a dbt Cloud environment. +:param job_name: The name of a dbt Cloud job. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The details of a job. +""" +# get project_id using project_name +projects = self.list_projects(name_contains=project_name, account_id=account_id) +# flatten & filter the list of responses +projects = [ +project +for response in projects +for project in response.json()["data"] +if project["name"] == project_name +] +if len(projects) != 1: +raise AirflowException(f"Found {len(projects)} projects with name `{project_name}`.") +project_id = projects[0]["id"] + +# get environment_id using project_id and environment_name +environments = self.list_environments( +project_id=project_id, name_contains=environment_name, account_id=account_id +) +# flatten & filter the list of responses +environments = [ +env +for response in environments +for env in response.json()["data"] +if env["name"] == environment_name +] +if len(environments) != 1: +raise AirflowException( +f"Found {len(environments)} environments with name `{environment_name}` in project `{project_name}`." +) +environment_id = environments[0]["id"] + +# get job using project_id, environment_id and job_name +list_jobs_responses = self.list_jobs( +project_id=project_id, +environment_id=environment_id, +name_contains=job_name, +account_id=account_id, +) +# flatten & filter the list of responses +jobs = [ +job +for response in list_jobs_responses +for job in response.json()["data"] +if job["name"] == job_name +] +if len(jobs) != 1: +raise AirflowException( +f"Found {len(jobs)} jobs with name `{job_name}` in project `{project_name}` and environment `{environment_name}`." Review Comment: Agreed, implemented your suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
jaklan commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1919154731 ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") +@fallback_to_default_account +def get_job_by_name( +self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None +) -> dict: +""" +Retrieve metadata for a specific job by combination of project, environment, and job name. + +Raises AirflowException if the job is not found or cannot be uniquely identified by provided parameters. + +:param project_name: The name of a dbt Cloud project. +:param environment_name: The name of a dbt Cloud environment. +:param job_name: The name of a dbt Cloud job. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The details of a job. +""" +# get project_id using project_name +projects = self.list_projects(name_contains=project_name, account_id=account_id) +# flatten & filter the list of responses +projects = [ +project +for response in projects +for project in response.json()["data"] +if project["name"] == project_name +] +if len(projects) != 1: +raise AirflowException(f"Found {len(projects)} projects with name `{project_name}`.") +project_id = projects[0]["id"] + +# get environment_id using project_id and environment_name +environments = self.list_environments( +project_id=project_id, name_contains=environment_name, account_id=account_id +) +# flatten & filter the list of responses +environments = [ +env +for response in environments Review Comment: Same as above ## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") +@fallback_to_default_account +def get_job_by_name( +self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None +) -> dict: +""" +Retrieve metadata for a specific job by combination of project, environment, and job name. + +Raises AirflowException if the job is not found or cannot be uniquely identified by provided parameters. + +:param project_name: The name of a dbt Cloud project. +:param environment_name: The name of a dbt Cloud environment. +:param job_name: The name of a dbt Cloud job. +:param account_id: Optional. The ID of a dbt Cloud account. +:return: The details of a job. +""" +# get project_id using project_name +projects = self.list_projects(name_contains=project_name, account_id=account_id) +# flatten & filter the list of responses +projects = [ +project +for response in projects +for project in response.json()["data"] +if project["name"] == project_name +] +if len(projects) != 1: +raise AirflowException(f"Found {len(projects)} projects with name `{project_name}`.") +project_id = projects[0]["id"] + +# get environment_id using project_id and environment_name +environments = self.list_environments( +project_id=project_id, name_contains=environment_name, account_id=account_id +) +# flatten & filter the list of responses +environments = [ +env +for response in environments +for env in response.json()["data"] +if env["name"] == environment_name +] +if len(environments) != 1: +raise AirflowException( +f"Found {len(environments)} environments with name `{environment_name}` in project `{project_name}`." +) +environment_id = environments[0]["id"] + +# get job using project_id, environment_id and job_name +list_jobs_responses = self.list_jobs( +project_id=project_id, +environment_id=environment_id, +name_contains=job_name, +account_id=account_id, +) +# flatten & filter the list of responses +jobs = [ +job +for response in list_jobs_responses +for job in response.json()["data"] +if job["name"] == job_name +] +if len(jobs) != 1: +raise AirflowException( +f"Found {len(jobs)} jobs with name `{job_name}` in project `{project_name}` and envir
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
ginone commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2595762377 @pikachuev thanks for the feedback! If you look at the code, `get_job_by_name` function specifically, you will notice there already is an optimization that works similarly to your suggestion. In order to minimize the response sizes, the process is done in 3 steps: 1. `list_projects` using `project_name` as filter (`name__icontains={project_name}`) 2. `list_environments ` using `project_id` as a filter 3. `list_jobs` using both `project_id` & `environment_id` as filters I decided to only use `name__icontains` for first step to limit the response size as retrieving all projects would not make sense. In 2nd & 3rd stages my assumption was that it should not be necessary to use such filtering because there should be no more than a few environments in each project and a dozen or so jobs in each environment. Now that you mentioned it, I think it would make sense to use `name__icontains` at least also for retrieving jobs, as there may be use cases when there are 100+ jobs in a single environment, so I'll submit that change shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
pikachuev commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2595680976 Hi @ginone , great proposal and change. We use in our Airflow environment an own helper and retrieve job_ids by names. We figure out that in our case the payload from dbt Cloud REST API is quite big to retrieve it for every dbt cloud job triggering. To reduce payload size we additionally filter rest api calls by filtering project name in API call, for example: `project_url = ( f"{base_url_v3}accounts/{dbt_cloud_account_id}/projects/" f"?account_id={dbt_cloud_account_id}" **f"&name__icontains={project_name}**" )` or ` job_url = ( f"{base_url_v2}accounts/{dbt_cloud_account_id}/jobs/" f"?account_id={dbt_cloud_account_id}" f"&project_id={project_id}" **f"&name__icontains={job_name}"** ) ` What do you think about such improvement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
jaklan commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2590120392 @joellabes another MR related to dbt Cloud Operator from our side - could you have a look if you are fine with the approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] New Optional dbt Cloud Job Operator Params [airflow]
boring-cyborg[bot] commented on PR #45634: URL: https://github.com/apache/airflow/pull/45634#issuecomment-2589560603 Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points: - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-pre-commit-hooks) will help you with that. - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it. - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations. - Be patient and persistent. It might take some time to get a review or get the final approval from Committers. - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack. - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices). - Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: d...@airflow.apache.org Slack: https://s.apache.org/airflow-slack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org