Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
eladkal commented on PR #40863: URL: https://github.com/apache/airflow/pull/40863#issuecomment-2509739590 Suppressed by https://github.com/apache/airflow/pull/44279 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
eladkal closed pull request #40863: Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success URL: https://github.com/apache/airflow/pull/40863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
pankajastro commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1862722528 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) Review Comment: Yes, if we are looping, then sleep makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
rawwar commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1855189810 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) Review Comment: @pankajastro , fyi for #44279 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
shahar1 commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1855178999 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) Review Comment: @RNHTTR @kandharvishnu In a second thought, maybe it's better to have a `time.sleep()` - otherwise, if it is pending or running for long time - requests will be sent immedately one after another, which might cause exceeding API rate limit. Example from another Google providers: https://github.com/apache/airflow/blob/3c58e01266f884544fdebc70f92b63848c610d2d/providers/src/airflow/providers/google/cloud/hooks/dataproc.py#L1241 What do you guys think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
shahar1 commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1855178999 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) Review Comment: @RNHTTR @kandharvishnu In a second thought, maybe it's better to have a `time.sleep()` - otherwise, if it is pending or running for long time - requests will be sent immedately, which might cause exceeding API rate limit. Example from another Google providers: https://github.com/apache/airflow/blob/3c58e01266f884544fdebc70f92b63848c610d2d/providers/src/airflow/providers/google/cloud/hooks/dataproc.py#L1241 What do you guys think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
kandharvishnu commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1853297545 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) +job.result(timeout=self.result_timeout, retry=self.result_retry) Review Comment: `job.result` will not trigger a new job. `_submit_job` ([link](https://github.com/kandharvishnu/airflow/blob/fc52d7d127975097734217888c133923a9c4a5ec/providers/src/airflow/providers/google/cloud/operators/bigquery.py#L2621)) will trigger the job -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
kandharvishnu commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1853297545 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) +job.result(timeout=self.result_timeout, retry=self.result_retry) Review Comment: `job.result` will not trigger a new job. `_submit_job` ([link](https://github.com/kandharvishnu/airflow/blob/fc52d7d127975097734217888c133923a9c4a5ec/providers/src/airflow/providers/google/cloud/operators/bigquery.py#L2621)) should trigger the job -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
github-actions[bot] closed pull request #40863: Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success URL: https://github.com/apache/airflow/pull/40863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
github-actions[bot] commented on PR #40863: URL: https://github.com/apache/airflow/pull/40863#issuecomment-2351849846 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
RNHTTR commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1686706553 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) Review Comment: I don't think we need to add a sleep here. Why not do... ```python while job.state in ("PENDING", "RUNNING"): ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
RNHTTR commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1686606136 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: Review Comment: I'd like to hear the Google team's opinion, ~but having another look at [QueryJob's result method](https://github.com/googleapis/python-bigquery/blob/ba61a8ab0da541ba1940211875d7ea2e9e17dfa8/google/cloud/bigquery/job/query.py#L1673-L1690), I'm not sure this is even necessary, because `result` now has logic to determine whether or not a job is `DONE`.~ [Airflow has pinned google-cloud-bigquery to < 3.21.0](https://github.com/apache/airflow/pull/39583/files), so the `is_job_done` logic is not included, because the [latest `google-cloud-bigquery` version before `3.21.0` does not have it ](https://github.com/googleapis/python-bigquery/blob/v3.20.1/google/cloud/bigquery/job/query.py#L1467). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
RNHTTR commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1686606136 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: Review Comment: I'd like to hear the Google team's opinion, ~but having another look at [QueryJob's result method](https://github.com/googleapis/python-bigquery/blob/ba61a8ab0da541ba1940211875d7ea2e9e17dfa8/google/cloud/bigquery/job/query.py#L1673-L1690), I'm not sure this is even necessary, because `result` now has logic to determine whether or not a job is `DONE`.~ [Airflow has pinned google-cloud-bigquery to < 3.21.0](https://github.com/apache/airflow/pull/39583/files), so the `is_job_done` logic is not included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add a check in BigQueryInsertJobOperator to verify the Job state before marking it as success [airflow]
RNHTTR commented on code in PR #40863: URL: https://github.com/apache/airflow/pull/40863#discussion_r1686602738 ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: +if job.state in ("PENDING", "RUNNING"): +import time + +time.sleep(5) +job.result(timeout=self.result_timeout, retry=self.result_retry) Review Comment: will calling `job.result` trigger a new job? ## airflow/providers/google/cloud/operators/bigquery.py: ## @@ -3021,7 +3021,15 @@ def execute(self, context: Any): # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) -self._handle_job_error(job) +while True: Review Comment: I'd like to hear the Google team's opinion, but having another look at [QueryJob's result method](https://github.com/googleapis/python-bigquery/blob/ba61a8ab0da541ba1940211875d7ea2e9e17dfa8/google/cloud/bigquery/job/query.py#L1673-L1690), I'm not sure this is even necessary, because `result` now has logic to determine whether or not a job is `DONE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org