Re: [I] BigQueryCheckOperator skips value and error check when in deferrable mode but not deferred [airflow]

2024-04-19 Thread via GitHub


potiuk closed issue #37885: BigQueryCheckOperator skips value and error check 
when in deferrable mode but not deferred
URL: https://github.com/apache/airflow/issues/37885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] BigQueryCheckOperator skips value and error check when in deferrable mode but not deferred [airflow]

2024-03-04 Thread via GitHub


boring-cyborg[bot] commented on issue #37885:
URL: https://github.com/apache/airflow/issues/37885#issuecomment-1977048313

   Thanks for opening your first issue here! Be sure to follow the issue 
template! If you are willing to raise PR to address this issue please do so, no 
need to wait for approval.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] BigQueryCheckOperator skips value and error check when in deferrable mode but not deferred [airflow]

2024-03-04 Thread via GitHub


kacpermuda opened a new issue, #37885:
URL: https://github.com/apache/airflow/issues/37885

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   >=10.2.0
   
   ### Apache Airflow version
   
   main branch
   
   ### Operating System
   
   MacOS
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   BREEZE
   
   ### What happened
   
   [This PR](https://github.com/apache/airflow/pull/31758) optimized the 
defferable mode in `BigQueryCheckOperator` and introduced this bug. When the 
operator is in defferable mode, but the job finishes quickly enough and is not 
deffered (it does not fall into `if job.running()` condition), no error is 
raised when the job fails and no value check is performed.
   
   Similar optimization has been made 
[here](https://github.com/apache/airflow/pull/31872/files) and then was fixed 
[here](https://github.com/apache/airflow/pull/34018). 
   
   ### What you think should happen instead
   
   Operator should raise an error when job fails and check te value returned by 
the job, even without being deffered.
   
   I think something like this similar to what was made in 
`BigQueryValueCheckOperator ` should be enough:
   ```
   if job.running():
   self.defer(
   timeout=self.execution_timeout,
   trigger=BigQueryCheckTrigger(
   conn_id=self.gcp_conn_id,
   job_id=job.job_id,
   project_id=hook.project_id,
   location=self.location or hook.location,
   poll_interval=self.poll_interval,
   impersonation_chain=self.impersonation_chain,
   ),
   method_name="execute_complete",
   )
   self._handle_job_error(job)
   # job.result() returns a RowIterator. Mypy expects an instance 
of SupportsNext[Any] for
   # the next() call which the RowIterator does not resemble to. 
Hence, ignore the arg-type error.
   records = next(job.result())  # type: ignore[arg-type]
   self._validate_records(records)  # type: ignore[attr-defined]
   self.log.info("Current state of job %s is %s", job.job_id, 
job.state)
   
   @staticmethod
   def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
   if job.error_result:
   raise AirflowException(f"BigQuery job {job.job_id} failed: 
{job.error_result}")
   
   def _validate_records(self, records) -> None:
   if not records:
   raise AirflowException("The query returned empty results")
   elif not all(records):
   self._raise_exception(  # type: ignore[attr-defined]
   f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}"
   )
   ```
   
   ### How to reproduce
   
   Run a really quick query in defferable mode, if it fails no error will be 
raised, if not, the values will not be checked.
   
   ### Anything else
   
   I'll make a PR in free time, but maybe somebody will pick it up by then.
   
   Also, there are no tests checking that operator (both CheckOperator and 
CheckValueOperator) raised an error, when in defferable mode but not deffered, 
so something like this should be added:
   
   ```
   @pytest.mark.db_test
   
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
   def 
test_bigquery_value_check_operator_async_finish_with_error_before_deferred(
   self, mock_hook, create_task_instance_of_operator
   ):
   job_id = "123456"
   hash_ = "hash"
   real_job_id = f"{job_id}_{hash_}"
   
   mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=True)
   mock_hook.return_value.insert_job.return_value.running.return_value 
= False
   
   ti = create_task_instance_of_operator(
   BigQueryValueCheckOperator,
   dag_id="dag_id",
   task_id="check_value",
   sql="SELECT COUNT(*) FROM Any",
   pass_value=2,
   use_legacy_sql=True,
   deferrable=True,
   )
   
   with pytest.raises(AirflowException) as exc:
   ti.task.execute(MagicMock())
   
   assert str(exc.value) == f"BigQuery job {real_job_id} failed: True"
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: