kacpermuda opened a new issue, #37885:
URL: https://github.com/apache/airflow/issues/37885
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
>=10.2.0
### Apache Airflow version
main branch
### Operating System
MacOS
### Deployment
Other
### Deployment details
BREEZE
### What happened
[This PR](https://github.com/apache/airflow/pull/31758) optimized the
defferable mode in `BigQueryCheckOperator` and introduced this bug. When the
operator is in defferable mode, but the job finishes quickly enough and is not
deffered (it does not fall into `if job.running()` condition), no error is
raised when the job fails and no value check is performed.
Similar optimization has been made
[here](https://github.com/apache/airflow/pull/31872/files) and then was fixed
[here](https://github.com/apache/airflow/pull/34018).
### What you think should happen instead
Operator should raise an error when job fails and check te value returned by
the job, even without being deffered.
I think something like this similar to what was made in
`BigQueryValueCheckOperator ` should be enough:
```
if job.running():
self.defer(
timeout=self.execution_timeout,
trigger=BigQueryCheckTrigger(
conn_id=self.gcp_conn_id,
job_id=job.job_id,
project_id=hook.project_id,
location=self.location or hook.location,
poll_interval=self.poll_interval,
impersonation_chain=self.impersonation_chain,
),
method_name="execute_complete",
)
self._handle_job_error(job)
# job.result() returns a RowIterator. Mypy expects an instance
of SupportsNext[Any] for
# the next() call which the RowIterator does not resemble to.
Hence, ignore the arg-type error.
records = next(job.result()) # type: ignore[arg-type]
self._validate_records(records) # type: ignore[attr-defined]
self.log.info("Current state of job %s is %s", job.job_id,
job.state)
@staticmethod
def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
if job.error_result:
raise AirflowException(f"BigQuery job {job.job_id} failed:
{job.error_result}")
def _validate_records(self, records) -> None:
if not records:
raise AirflowException("The query returned empty results")
elif not all(records):
self._raise_exception( # type: ignore[attr-defined]
f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}"
)
```
### How to reproduce
Run a really quick query in defferable mode, if it fails no error will be
raised, if not, the values will not be checked.
### Anything else
I'll make a PR in free time, but maybe somebody will pick it up by then.
Also, there are no tests checking that operator (both CheckOperator and
CheckValueOperator) raised an error, when in defferable mode but not deffered,
so something like this should be added:
```
@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def
test_bigquery_value_check_operator_async_finish_with_error_before_deferred(
self, mock_hook, create_task_instance_of_operator
):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=True)
mock_hook.return_value.insert_job.return_value.running.return_value
= False
ti = create_task_instance_of_operator(
BigQueryValueCheckOperator,
dag_id="dag_id",
task_id="check_value",
sql="SELECT COUNT(*) FROM Any",
pass_value=2,
use_legacy_sql=True,
deferrable=True,
)
with pytest.raises(AirflowException) as exc:
ti.task.execute(MagicMock())
assert str(exc.value) == f"BigQuery job {real_job_id} failed: True"
```
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: