(airflow) branch main updated (6781c632e3 -> d08f893f25)
This is an automated email from the ASF dual-hosted git repository. pankaj pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 6781c632e3 doc: dynamictaskmapping pythonoperator op_kwargs (#39242) add d08f893f25 Expose AWS IAM missing param in Hashicorp secret (#38536) No new revisions were added by this update. Summary of changes: .../hashicorp/_internal_client/vault_client.py | 40 +- airflow/providers/hashicorp/provider.yaml | 6 airflow/providers/hashicorp/secrets/vault.py | 3 ++ .../secrets-backends/hashicorp-vault.rst | 12 +++ 4 files changed, 53 insertions(+), 8 deletions(-)
Re: [PR] Expose AWS IAM missing param in Hashicorp secret [airflow]
pankajastro merged PR #38536: URL: https://github.com/apache/airflow/pull/38536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger kwarg encryption migration [airflow]
jedcunningham commented on code in PR #39246: URL: https://github.com/apache/airflow/pull/39246#discussion_r1578894900 ## airflow/migrations/versions/0140_2_9_0_update_trigger_kwargs_type.py: ## @@ -38,13 +41,43 @@ airflow_version = "2.9.0" +def get_session() -> sa.orm.Session: +conn = op.get_bind() +sessionmaker = sa.orm.sessionmaker() +return sessionmaker(bind=conn) + def upgrade(): -"""Update trigger kwargs type to string""" +"""Update trigger kwargs type to string and encrypt""" with op.batch_alter_table("trigger") as batch_op: batch_op.alter_column("kwargs", type_=sa.Text(), ) +if not context.is_offline_mode(): +session = get_session() +try: +for trigger in session.query(Trigger): +trigger.kwargs = trigger.kwargs +session.commit() +finally: +session.close() + def downgrade(): -"""Unapply update trigger kwargs type to string""" +"""Unapply update trigger kwargs type to string and encrypt""" +if context.is_offline_mode(): +print(dedent(""" + +-- WARNING: Unable to decrypt trigger kwargs automatically in offline mode! Review Comment: I don't think we can do any better than spitting out a warning :(. But if folks have ideas here, happy to hear them. ## airflow/models/trigger.py: ## @@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, Any]: from airflow.models.crypto import get_fernet from airflow.serialization.serialized_objects import BaseSerialization -decrypted_kwargs = json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8")) +# We weren't able to encrypt the kwargs in all migration paths, +# so we need to handle the case where they are not encrypted. +# Triggers aren't long lasting, so we can skip encrypting them now. Review Comment: We could start doing this if we'd like to, but I'd suggest we do it in a follow up PR. ## tests/models/test_trigger.py: ## @@ -378,3 +380,19 @@ def test_serialize_sensitive_kwargs(): assert isinstance(trigger_row.encrypted_kwargs, str) assert "value1" not in trigger_row.encrypted_kwargs assert "value2" not in trigger_row.encrypted_kwargs + + +def test_kwargs_not_encrypted(): +""" +Tests that we don't decrypt kwargs if they aren't encrypted. +We weren't able to encrypt the kwargs in all migration paths. +""" +trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) +# force the `encrypted_kwargs` to be unencrypted, like they would be after an upgrade +trigger.encrypted_kwargs = json.dumps( +BaseSerialization.serialize({"param1": "value1", "param2": "value2"}) +) +print(trigger.encrypted_kwargs) Review Comment: ```suggestion ``` Nothing to see here 臘 ## airflow/models/trigger.py: ## @@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, Any]: from airflow.models.crypto import get_fernet from airflow.serialization.serialized_objects import BaseSerialization -decrypted_kwargs = json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8")) +# We weren't able to encrypt the kwargs in all migration paths, +# so we need to handle the case where they are not encrypted. +# Triggers aren't long lasting, so we can skip encrypting them now. +if encrypted_kwargs.startswith("{"): +decrypted_kwargs = json.loads(encrypted_kwargs) Review Comment: This fixes the offline upgrade path by detecting the kwargs aren't actually encrypted. ## tests/models/test_trigger.py: ## @@ -378,3 +380,19 @@ def test_serialize_sensitive_kwargs(): assert isinstance(trigger_row.encrypted_kwargs, str) assert "value1" not in trigger_row.encrypted_kwargs assert "value2" not in trigger_row.encrypted_kwargs + + +def test_kwargs_not_encrypted(): +""" +Tests that we don't decrypt kwargs if they aren't encrypted. +We weren't able to encrypt the kwargs in all migration paths. +""" +trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) +# force the `encrypted_kwargs` to be unencrypted, like they would be after an upgrade Review Comment: ```suggestion # force the `encrypted_kwargs` to be unencrypted, like they would be after an offline upgrade ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
jedcunningham commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076404146 I've opened #39246 as an alternative solution to this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix trigger kwarg encryption migration [airflow]
jedcunningham opened a new pull request, #39246: URL: https://github.com/apache/airflow/pull/39246 Closes: #38836 Alternative to #38876 Do the encryption in the migration itself, and fix support for offline migrations as well. The offline up migration won't actually encrypt the trigger kwargs as there isn't a safe way to accomplish that, so the decryption processes checks and short circuits if it isn't encrypted. The offline down migration will now print out a warning that the offline migration will fail if there are any running triggers. I think this is the best we can do for that scenario (and folks willing to do offline migrations will hopefully be able to understand the situation). This also solves the "encrypting the already encrypted kwargs" bug in 2.9.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FEAT] added notebook error in databricks deferrable handler [airflow]
Lee-W commented on PR #39110: URL: https://github.com/apache/airflow/pull/39110#issuecomment-2076387313 I think we're good to merge this one. Just want to see whether @pankajkoti have some time to double check if I missed anything. I'm going to merge this later today. Please let me know if anyone want to take a deeper look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FEAT] added notebook error in databricks deferrable handler [airflow]
gaurav7261 commented on PR #39110: URL: https://github.com/apache/airflow/pull/39110#issuecomment-2076349163 Hi @pankajkoti , can you please review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch main updated: doc: dynamictaskmapping pythonoperator op_kwargs (#39242)
This is an automated email from the ASF dual-hosted git repository. uranusjr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 6781c632e3 doc: dynamictaskmapping pythonoperator op_kwargs (#39242) 6781c632e3 is described below commit 6781c632e3ba7fd4404f21c8b932e1374a4a9322 Author: raphaelauv AuthorDate: Thu Apr 25 06:40:06 2024 +0200 doc: dynamictaskmapping pythonoperator op_kwargs (#39242) Co-authored-by: raphaelauv --- .../authoring-and-scheduling/dynamic-task-mapping.rst | 18 ++ 1 file changed, 18 insertions(+) diff --git a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst index acf9cfba3d..739bc6fee9 100644 --- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst +++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst @@ -291,6 +291,24 @@ Sometimes an upstream needs to specify multiple arguments to a downstream operat This produces two task instances at run-time printing ``1`` and ``2`` respectively. +Also it's possible to mix ``expand_kwargs`` with most of the operators arguments like the ``op_kwargs`` of the PythonOperator + +.. code-block:: python + +def print_args(x, y): +print(x) +print(y) +return x + y + + +PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs( +[ +{"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True}, +{"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False}, +] +) + + Similar to ``expand``, you can also map against a XCom that returns a list of dicts, or a list of XComs each returning a dict. Re-using the S3 example above, you can use a mapped task to perform "branching" and copy files to different buckets: .. code-block:: python
Re: [PR] doc: dynamictaskmapping pythonoperator op_kwargs [airflow]
uranusjr merged PR #39242: URL: https://github.com/apache/airflow/pull/39242 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Resolve deprecations in Python sensors/operators/decorators [airflow]
uranusjr commented on code in PR #39241: URL: https://github.com/apache/airflow/pull/39241#discussion_r1578832517 ## tests/deprecations_ignore.yml: ## @@ -349,23 +313,8 @@ - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_key_type -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_no_dict -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_manual_multiple_outputs_false_with_typings -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_empty_dict -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_ignore_typing -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_return_none -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_arguments_are_templatized -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_keyword_arguments_are_templatized -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_xcom_arg +# Inlets/Outlets use TaskInstance.xcom_push() with deprecated arguments in airflow/lineage/__init__.py Review Comment: Why not fix these then? They seem pretty trivial. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add retry logic for KubernetesCreateResourceOperator and KubernetesJobOperator [airflow]
dirrao commented on code in PR #39201: URL: https://github.com/apache/airflow/pull/39201#discussion_r1578826472 ## airflow/providers/cncf/kubernetes/hooks/kubernetes.py: ## @@ -486,6 +488,12 @@ def get_deployment_status( except Exception as exc: raise exc +@tenacity.retry( Review Comment: This will be retried for non transient errors as well. can't we rely on the task retry instead of explicit retry here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: sqa deprecations for airflow core [airflow]
uranusjr commented on code in PR #39211: URL: https://github.com/apache/airflow/pull/39211#discussion_r1578826032 ## airflow/www/views.py: ## @@ -3294,7 +3292,7 @@ def next_run_datasets(self, dag_id): latest_run = dag_model.get_last_dagrun(session=session) events = [ -dict(info) +dict(info._mapping) Review Comment: I assume this attribute is officially supported although it looks private -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]
dirrao commented on code in PR #39177: URL: https://github.com/apache/airflow/pull/39177#discussion_r1578824340 ## airflow/utils/log/file_task_handler.py: ## @@ -366,10 +366,7 @@ def _read( executor_messages: list[str] = [] executor_logs: list[str] = [] served_logs: list[str] = [] -is_running = ti.try_number == try_number and ti.state in ( -TaskInstanceState.RUNNING, -TaskInstanceState.DEFERRED, Review Comment: when the task is deferred, Does the logs are served from triggerer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
jedcunningham commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076305345 Currently downgrade also doesn't work when you have triggers - alembic tries to switch the column back to json before decrypting happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] added a new condition before launching the self._scan_stale_dags() fu… [airflow]
uranusjr commented on code in PR #39159: URL: https://github.com/apache/airflow/pull/39159#discussion_r1578810984 ## docs/apache-airflow/core-concepts/dags.rst: ## @@ -924,3 +924,10 @@ if it fails for ``N`` number of times consecutively. we can also provide and override these configuration from DAG argument: - ``max_consecutive_failed_dag_runs``: Overrides :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`. + +Disable deletion of stale dags Review Comment: ```suggestion Disable deletion of stale DAGs ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] added a new condition before launching the self._scan_stale_dags() fu… [airflow]
uranusjr commented on code in PR #39159: URL: https://github.com/apache/airflow/pull/39159#discussion_r1578810901 ## airflow/dag_processing/manager.py: ## @@ -599,7 +599,10 @@ def _run_parsing_loop(self): if self.standalone_dag_processor: self._fetch_callbacks(max_callbacks_per_loop) -self._scan_stale_dags() + +# this variable gives us flexibility to purge stale dags or not. Review Comment: This is obvious from actual logic; the comment is quite redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]
uranusjr commented on code in PR #39244: URL: https://github.com/apache/airflow/pull/39244#discussion_r1578810106 ## airflow/cli/commands/task_command.py: ## @@ -199,6 +199,8 @@ def _get_ti_db_access( ) # TODO: Validate map_index is in range? ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index) +if create_if_necessary == "db": +session.add(ti) Review Comment: Would this not just delay this until when we need to handle `create_if_necessary == "memory"`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Amazon Bedrock - Knowledge Bases and Data Sources [airflow]
ferruzzi opened a new pull request, #39245: URL: https://github.com/apache/airflow/pull/39245 Adds hooks, operators, sensors, triggers, and waiters to enable support for Amazon Bedrock Knowledge Bases. This includes Bedrock Data Sources and Ingestion Jobs, and some light additions (a thin hook, and a waiter/sensor/trigger set) for Amazon OpenSearch Serverless which are needed to make the system test work end-to-end. System test and unit tests are included, as well as docs. Static checks, unit tests, and build-docs all pass locally. --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add tests for EmrServerlessJobSensor [airflow]
mateuslatrova commented on PR #39099: URL: https://github.com/apache/airflow/pull/39099#issuecomment-2076129433 Hi @dirrao ! Just fixed the tests for EmrServelessJobSensor, whenever you can take a look. I decided to do a separate PR with the tests for the EmrServerlessApplicationSensor. Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Added JOB_STATE_CANCELLED and pool_sleep GCP Dataflow Operators [airflow]
github-actions[bot] closed pull request #37364: Added JOB_STATE_CANCELLED and pool_sleep GCP Dataflow Operators URL: https://github.com/apache/airflow/pull/37364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] feat/log null bytes file [airflow]
github-actions[bot] closed pull request #37894: feat/log null bytes file URL: https://github.com/apache/airflow/pull/37894 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Raising Exception if dag is not in active state while triggering dag [airflow]
github-actions[bot] commented on PR #37836: URL: https://github.com/apache/airflow/pull/37836#issuecomment-2076076835 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
dstandish commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076067502 > > Should we consider yanking 2.9.0 once 2.9.1 is out with this fix? > > I don't think so. In my point of view if we wanted to yank then we should have merge this quickly and cut 2.9.1 immidiately as critical bug fix. I asked about this when issue was discoved but others thought we should wait for more more bug fixes as this was not categorized as critical. To me, I’m not sure whether it meets the criteria of critical, but it’s pretty bad. If the user is using home chart, then it will run the migration pot a lot, and if there are triggers in flight, this could happen and sorta bork the cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
jedcunningham commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076010902 Just confirmed my suspicions, it doesn't work for offline migrations: ``` [2024-04-24T23:08:17.337+] {triggerer_job_runner.py:341} ERROR - Exception when executing TriggererJobRunner._run_trigger_loop Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 339, in _execute self._run_trigger_loop() File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 362, in _run_trigger_loop self.load_triggers() File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 377, in load_triggers self.trigger_runner.update_triggers(set(ids)) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 676, in update_triggers new_trigger_instance = trigger_class(**new_trigger_orm.kwargs) ^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 93, in kwargs return self._decrypt_kwargs(self.encrypted_kwargs) ^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 119, in _decrypt_kwargs decrypted_kwargs = json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8")) ^^ File "/home/airflow/.local/lib/python3.12/site-packages/cryptography/fernet.py", line 211, in decrypt raise InvalidToken cryptography.fernet.InvalidToken ``` This was all that was spit out for that migration: ``` -- Running upgrade ee1467d4aa35 -> 1949afb29106 ALTER TABLE trigger ALTER COLUMN kwargs TYPE TEXT; UPDATE alembic_version SET version_num='1949afb29106' WHERE alembic_version.version_num = 'ee1467d4aa35'; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
jedcunningham commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2075998782 Ignoring that the conditional is wrong, I don't follow how this migration approach works for offline migration in the first place. Am I missing functionality somewhere that solves that scenario? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] fix: sqa deprecations for airflow task cmd [airflow]
dondaum opened a new pull request, #39244: URL: https://github.com/apache/airflow/pull/39244 related: #28723 fix deprecations for SQLAlchemy 2.0 for Airflow core task command. SQLAlchemy 2.0 is changing the behavior when an object is being merged into a Session along the backref cascade. Until SQLAlchemy 1.4 and assuming a bidirectional relationship between a TaskInstance and a DagRun, if a DagRun object is already in a Session the TaskInstance object gets put into the Session as well. This behavior is deprecated for [removal](https://docs.sqlalchemy.org/en/20/changelog/migration_14.html#cascade-backrefs-behavior-deprecated-for-removal-in-2-0) in SQLAlchemy 2.0. In order to mentain the actual behavior and to fix the warning, we need to ensure that both objects are either not in the session or are in the session when they are associated with each other. See [here](https://github.com/sqlalchemy/sqlalchemy/discussions/7693) for more information. ### Reported in core - [x] airflow/cli/commands/task_command.py:202 --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] KubernetesPodOperator duplicating logs when interrupted [airflow]
tirkarthi commented on issue #39236: URL: https://github.com/apache/airflow/issues/39236#issuecomment-2075830089 Related https://github.com/apache/airflow/issues/33498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] front - admin menu - drop-down non deterministic [airflow]
jscheffl commented on issue #39135: URL: https://github.com/apache/airflow/issues/39135#issuecomment-2075813598 @potiuk Will try a regression the next days, might need a moment. But we are also using OAUTH and it smells very probable that the correction fixes it... THANKS! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Make audit log before/after filterable [airflow]
jscheffl commented on PR #39120: URL: https://github.com/apache/airflow/pull/39120#issuecomment-2075804286 > > I tried to "use" the audit log and either I am tooo stupid (rarely have clicked there, maybe I'm a noob in this) but: > > > > * When clicking in one of the date fields and picking a date the filter is not applied. > > * Clicking or pressing enter to a different aera clears the previous selcted data > > * I tried to use multiple options of include/exclude filter and it always when using include generated empty results or in case of exclude no effect. Somehow I was not able to filter and I did not understand where I can filter for. > > > > Might it be that with the changes something is broken? Or is my browser failing? Ubuntu 22.04 x64, Firefox. > > Something like selected a Before date and adding an event name doesn't work for you? https://private-user-images.githubusercontent.com/4600967/324977078-c06c0422-211c-4469-9cb6-2b6d9f2e6abf.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTM5OTA4MjEsIm5iZiI6MTcxMzk5MDUyMSwicGF0aCI6Ii80NjAwOTY3LzMyNDk3NzA3OC1jMDZjMDQyMi0yMTFjLTQ0NjktOWNiNi0yYjZkOWYyZTZhYmYucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI0MDQyNCUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNDA0MjRUMjAyODQxWiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9ZDc2Njg3YTI1MTkxODg2MWI5YmJkMGYwYWI1YjhjYTE2OGMyODA5ZjRmNmJjMmM2YTIzOTZhZDE4ZTgyNTEyMiZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QmYWN0b3JfaWQ9MCZrZXlfaWQ9MCZyZXBvX2lkPTAifQ.OqQj40YShM6ntFme2OIWR7QXjWV8RjhNXKru2YI7w8M;> Did a regression on my Ubuntu 20.04 x64 LTS: - Firefox - Both Start/End date are _not_ working. I can use the date picker but upon exit of the popup, date / time selection is reset - Event filter is working (when entering text, but no suggestions in the popup below, do I need to expect anything?) - Chromium - Date start/end selection is working. - Event filter is working (when entering text, but no suggestions in the popup below, do I need to expect anything?) UPDATE! I beleieve I found the root cause... in Firefox it looks like: ![image](https://github.com/apache/airflow/assets/95105677/86a72a8c-09a3-49d2-a34b-9bc40f80b4be) --> The DATE Popup oly shows a date selection. After clicking on a date the time fields are filled with --:--:-- and when exiting the dtae selection is reset. If I change the time before leaving focus the filter is applied. Difference in Chromium the popup is like... WITH date AND time: ![image](https://github.com/apache/airflow/assets/95105677/e7bf200a-1b55-4821-92e1-a5d357f3d968) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
vincbeck commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578495743 ## airflow/providers/celery/executors/celery_executor.py: ## @@ -368,8 +368,10 @@ def update_all_task_states(self) -> None: if state: self.update_task_state(key, state, info) -def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None: -super().change_state(key, state, info) +def change_state( +self, key: TaskInstanceKey, state: TaskInstanceState, info=None, remove_running=True +) -> None: +super().change_state(key, state, info, remove_running=remove_running) Review Comment: Same here no? If Airflow version is less than 2.10, it will not recognize `remove_running`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators [airflow]
florian-guily commented on issue #39222: URL: https://github.com/apache/airflow/issues/39222#issuecomment-2075777459 > @florian-guily Would you like to be assigned to this issue? I'd like to yes, i'll find the time to resolve it ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DRAFT] Switch to Connexion 3 framework [airflow]
potiuk commented on code in PR #39055: URL: https://github.com/apache/airflow/pull/39055#discussion_r1578443041 ## airflow/www/app.py: ## @@ -192,3 +209,8 @@ def purge_cached_app(): """Remove the cached version of the app in global state.""" global app app = None + + +def cached_flask_app(config=None, testing=False): Review Comment: Replaced with both app cached. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DRAFT] Switch to Connexion 3 framework [airflow]
potiuk commented on code in PR #39055: URL: https://github.com/apache/airflow/pull/39055#discussion_r1578442260 ## airflow/www/package.json: ## @@ -141,7 +141,8 @@ "reactflow": "^11.7.4", "redoc": "^2.0.0-rc.72", "remark-gfm": "^3.0.1", -"swagger-ui-dist": "4.1.3", +"sanitize-html": "^2.12.1", Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch constraints-main updated: Updating constraints. Github run id:8821161824
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch constraints-main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/constraints-main by this push: new 1376aa1294 Updating constraints. Github run id:8821161824 1376aa1294 is described below commit 1376aa129487513f871f7ec1011dd4c3ff97a965 Author: Automated GitHub Actions commit AuthorDate: Wed Apr 24 19:41:22 2024 + Updating constraints. Github run id:8821161824 This update in constraints is automatically committed by the CI 'constraints-push' step based on 'refs/heads/main' in the 'apache/airflow' repository with commit sha 9619536e6f1f5737d56d2ef761c2e4467f17cd4e. The action that build those constraints can be found at https://github.com/apache/airflow/actions/runs/8821161824/ The image tag used for that build was: 9619536e6f1f5737d56d2ef761c2e4467f17cd4e. You can enter Breeze environment with this image by running 'breeze shell --image-tag 9619536e6f1f5737d56d2ef761c2e4467f17cd4e' All tests passed in this build so we determined we can push the updated constraints. See https://github.com/apache/airflow/blob/main/README.md#installing-from-pypi for details. --- constraints-3.10.txt | 15 +++ constraints-3.11.txt | 15 +++ constraints-3.12.txt | 15 +++ constraints-3.8.txt | 15 +++ constraints-3.9.txt | 15 +++ constraints-no-providers-3.10.txt | 4 ++-- constraints-no-providers-3.11.txt | 4 ++-- constraints-no-providers-3.12.txt | 4 ++-- constraints-no-providers-3.8.txt | 4 ++-- constraints-no-providers-3.9.txt | 4 ++-- constraints-source-providers-3.10.txt | 15 +++ constraints-source-providers-3.11.txt | 15 +++ constraints-source-providers-3.12.txt | 15 +++ constraints-source-providers-3.8.txt | 15 +++ constraints-source-providers-3.9.txt | 15 +++ 15 files changed, 80 insertions(+), 90 deletions(-) diff --git a/constraints-3.10.txt b/constraints-3.10.txt index 1d3934447d..78dd283592 100644 --- a/constraints-3.10.txt +++ b/constraints-3.10.txt @@ -1,6 +1,6 @@ # -# This constraints file was automatically generated on 2024-04-23T20:36:37.772543 +# This constraints file was automatically generated on 2024-04-24T18:26:41.767946 # via "eager-upgrade" mechanism of PIP. For the "main" branch of Airflow. # This variant of constraints install uses the HEAD of the branch version for 'apache-airflow' but installs # the providers from PIP-released packages at the moment of the constraint generation. @@ -229,7 +229,7 @@ bcrypt==4.1.2 beautifulsoup4==4.12.3 billiard==4.2.0 bitarray==2.9.2 -black==24.4.0 +black==24.4.1 blinker==1.7.0 boto3==1.34.69 botocore==1.34.69 @@ -283,7 +283,6 @@ docopt==0.6.2 docstring_parser==0.16 docutils==0.16 duckdb==0.10.2 -editables==0.5 elastic-transport==8.13.0 elasticsearch==8.13.0 email_validator==2.1.1 @@ -375,8 +374,8 @@ gssapi==1.8.3 gunicorn==22.0.0 h11==0.14.0 h2==4.1.0 -hatch==1.9.4 -hatchling==1.21.1 +hatch==1.9.6 +hatchling==1.24.2 hdfs==2.7.3 hmsclient==0.1.1 hpack==4.0.0 @@ -480,7 +479,7 @@ nodeenv==1.8.0 numpy==1.26.4 oauthlib==3.2.2 objsize==0.7.0 -openai==1.23.2 +openai==1.23.3 openapi-schema-validator==0.6.2 openapi-spec-validator==0.7.1 openlineage-integration-common==1.12.0 @@ -724,11 +723,11 @@ uv==0.1.35 validators==0.28.1 vertica-python==1.3.8 vine==5.1.0 -virtualenv==20.26.0 +virtualenv==20.25.3 watchtower==3.2.0 wcwidth==0.2.13 weaviate-client==3.26.2 -websocket-client==1.7.0 +websocket-client==1.8.0 wirerope==0.4.7 wrapt==1.16.0 xmlsec==1.3.13 diff --git a/constraints-3.11.txt b/constraints-3.11.txt index f5bf50aa28..5265c11aae 100644 --- a/constraints-3.11.txt +++ b/constraints-3.11.txt @@ -1,6 +1,6 @@ # -# This constraints file was automatically generated on 2024-04-23T20:36:37.241476 +# This constraints file was automatically generated on 2024-04-24T18:26:42.167347 # via "eager-upgrade" mechanism of PIP. For the "main" branch of Airflow. # This variant of constraints install uses the HEAD of the branch version for 'apache-airflow' but installs # the providers from PIP-released packages at the moment of the constraint generation. @@ -228,7 +228,7 @@ bcrypt==4.1.2 beautifulsoup4==4.12.3 billiard==4.2.0 bitarray==2.9.2 -black==24.4.0 +black==24.4.1 blinker==1.7.0 boto3==1.34.69 botocore==1.34.69 @@ -282,7 +282,6 @@ docopt==0.6.2 docstring_parser==0.16 docutils==0.16 duckdb==0.10.2 -editables==0.5 elastic-transport==8.13.0 elasticsearch==8.13.0 email_validator==2.1.1 @@ -373,8 +372,8 @@ gssapi==1.8.3 gunicorn==22.0.0 h11==0.14.0 h2==4.1.0 -hatch==1.9.4 -hatchling==1.21.1 +hatch==1.9.6 +hatchling==1.24.2 hdfs==2.7.3
Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]
RNHTTR commented on PR #39177: URL: https://github.com/apache/airflow/pull/39177#issuecomment-2075686454 I can't resolve my conversations for some reason... maybe because some git stuff as it's only showing one commit? Either way, my comments were addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Power BI Provider [airflow]
ambika-garg opened a new pull request, #39243: URL: https://github.com/apache/airflow/pull/39243 # Apache Airflow Provider for Power BI. ## Operators ### PowerBIDatasetRefreshOperator The operator triggers the Power BI dataset refresh and pushes the details of refresh in Xcom. It can accept the following parameters: * `dataset_id`: The dataset Id. * `group_id`: The workspace Id. * `wait_for_termination`: (Default value: True) Wait until the pre-existing or current triggered refresh completes before exiting. * `force_refresh`: When enabled, it will force refresh the dataset again, after pre-existing ongoing refresh request is terminated. * `timeout`: Time in seconds to wait for a dataset to reach a terminal status for non-asynchronous waits. Used only if ``wait_for_termination`` is True. * `check_interval`: Number of seconds to wait before rechecking the refresh status. ## Hooks ### PowerBI Hook A hook to interact with Power BI. * `powerbi_conn_id`: Airflow Connection ID that contains the connection information for the Power BI account used for authentication. ## Features * Xcom Integration: The Power BI Dataset refresh operator enriches the Xcom with essential fields for downstream tasks: 1. `powerbi_dataset_refresh_id`: Request Id of the Dataset Refresh. 2. `powerbi_dataset_refresh_status`: Refresh Status. * `Unknown`: Refresh state is unknown or a refresh is in progress. * `Completed`: Refresh successfully completed. * `Failed`: Refresh failed (details in `powerbi_dataset_refresh_error`). * `Disabled`: Refresh is disabled by a selective refresh. 3. `powerbi_dataset_refresh_end_time`: The end date and time of the refresh (may be None if a refresh is in progress) 4. `powerbi_dataset_refresh_error`: Failure error code in JSON format (None if no error) * External Monitoring link: The operator conveniently provides a redirect link to the Power BI UI for monitoring refreshes. ## Sample DAG to use the plugin. Check out the sample DAG code below: ```python from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from operators.powerbi_refresh_dataset_operator import PowerBIDatasetRefreshOperator with DAG( dag_id='refresh_dataset_powerbi', schedule_interval=None, start_date=datetime(2023, 8, 7), catchup=False, concurrency=20, tags=['powerbi', 'dataset', 'refresh'] ) as dag: refresh_in_given_workspace = PowerBIDatasetRefreshOperator( task_id="refresh_in_given_workspace", dataset_id="http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators [airflow]
RNHTTR commented on issue #39222: URL: https://github.com/apache/airflow/issues/39222#issuecomment-2075666741 @florian-guily Would you like to be assigned to this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
eladkal commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2075624714 > Should we consider yanking 2.9.0 once 2.9.1 is out with this fix? I don't think so. In my point of view if we wanted to yank then we should have merge this quickly and cut 2.9.1 immidiately as critical bug fix. I asked about this when issue was discoved but others thought we should wait for more more bug fixes as this was not categorized as critical. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Run system tests using `dag.test()` [airflow]
potiuk commented on PR #39176: URL: https://github.com/apache/airflow/pull/39176#issuecomment-2075562155 It's a completely different mechanism - it just runs `_run_raw_task` for all tasks according to dependencies - look at the code. If you run it with a different executor, what you really need you need to run Airflow Scheduler with the executor and you need to trigger the DAG (via APi or CLI). You should not really use Pytest for that. Previously backfill was used to run the dags (this is how Debug Executor worked) - but that was a feature of Debug Executor - if you have ECS executor, then having scheduler to run the DAG is probably much better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Run system tests using `dag.test()` [airflow]
vincbeck commented on PR #39176: URL: https://github.com/apache/airflow/pull/39176#issuecomment-2075546584 Question on this one though. From the doc it says, `dag.test()` does not use an executor at all. So it is not possible, somehow, to configure to use a specific executor. I had in mind to possibly run some system tests with different executor (e.g. ECSExecutor). How hard would that be? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Deprecation of AutoML services: Add deprecation warnings and raise exceptions for already deprecated ones [airflow]
potiuk commented on PR #38673: URL: https://github.com/apache/airflow/pull/38673#issuecomment-2075537682 cc: @VladaZakharova - there were some doubts for Google team about those deprecations - does this one look ok for you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow trust env parameter to be defined in extra options of HTTP Connection [airflow]
potiuk merged PR #39161: URL: https://github.com/apache/airflow/pull/39161 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch main updated (12ce2dcd46 -> 9619536e6f)
This is an automated email from the ASF dual-hosted git repository. potiuk pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 12ce2dcd46 Retry merge commit retrieval on failure (#39231) add 9619536e6f Allow trust env parameter to be defined in extra options of HTTP Connection (#39161) No new revisions were added by this update. Summary of changes: airflow/providers/http/hooks/http.py| 4 tests/providers/http/hooks/test_http.py | 23 +++ 2 files changed, 27 insertions(+)
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
pankajkoti commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1578308432 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": ClusterStatus.State.DELETING, +"cluster": cluster, +} +) +return +elif state == ClusterStatus.State.RUNNING: +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state, +"cluster": cluster, +} +) +return +self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds) +await asyncio.sleep(self.polling_interval_seconds) +except asyncio.CancelledError: +try: +if self.delete_on_error: +self.log.info("Deleting cluster %s.", self.cluster_name) +# The synchronous hook is utilized to delete the cluster when a task is cancelled. +# This is because the asynchronous hook deletion is not awaited when the trigger task +# is cancelled. The call for deleting the cluster through the sync hook is not a blocking +# call, which means it does not wait until the cluster is deleted. +self.get_sync_hook().delete_cluster( +region=self.region, cluster_name=self.cluster_name, project_id=self.project_id +) +self.log.info("Deleted cluster %s during cancellation.", self.cluster_name) +except Exception as e: +self.log.error("Error during cancellation handling: %s", e) +raise AirflowException("Error during cancellation handling: %s", e) + +async def fetch_cluster(self) -> Cluster: +"""Fetch the cluster status.""" +return await self.get_async_hook().get_cluster( +project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +) + +async def delete_when_error_occurred(self, cluster: Cluster): +""" +Delete the cluster on error. + +:param cluster: The cluster to delete. +""" +if self.delete_on_error: +self.log.info("Deleting cluster %s.", self.cluster_name) +await self.get_async_hook().delete_cluster( +region=self.region, cluster_name=self.cluster_name, project_id=self.project_id +) +self.log.info("Cluster %s has been deleted.", self.cluster_name) +else: +self.log.info( +"Cluster %s is not be deleted as delete_on_error is set to False.", self.cluster_name Review Comment: ```suggestion "Cluster %s is not deleted as delete_on_error is set to False.", self.cluster_name ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump minimum Airflow version in providers to Airflow 2.7.0 [airflow]
potiuk commented on code in PR #39240: URL: https://github.com/apache/airflow/pull/39240#discussion_r1578307454 ## dev/breeze/src/airflow_breeze/global_constants.py: ## @@ -473,10 +473,8 @@ def _exclusion(providers: Iterable[str]) -> str: BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str]] = [ { "python-version": "3.8", -"airflow-version": "2.6.0", -"remove-providers": _exclusion( -["openlineage", "common.io", "cohere", "fab", "qdrant", "microsoft.azure"] -), +"airflow-version": "2.7.0", Review Comment: You should just remove 2.7.0 -> there is a small incompatibility in the cohere provider with importlib, that makes it technically `2.7.1` compatible not 2.7.0 - that's why 2.7.1 is added below. I think it's not worth to complicate things and exclude `cohere` for 2.7.0 additionally - technically there is a very small differerence between 2.7.0 and 2.7.1 so I'd rather leave 2.7.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Resolve deprecations in Python sensors/operators/decorators [airflow]
Taragolis commented on code in PR #39241: URL: https://github.com/apache/airflow/pull/39241#discussion_r1578300862 ## tests/deprecations_ignore.yml: ## @@ -349,23 +313,8 @@ - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname - tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_key_type -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_no_dict -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_manual_multiple_outputs_false_with_typings -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_empty_dict -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_ignore_typing -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_return_none -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_arguments_are_templatized -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_keyword_arguments_are_templatized -- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_xcom_arg +# Inlets/Outlets use TaskInstance.xcom_push() with deprecated arguments in airflow/lineage/__init__.py Review Comment: https://github.com/apache/airflow/blob/c6bc0529805be98cffbf336070abee32b93ca39a/airflow/lineage/__init__.py#L84-L92 ```console ERRORairflow.task:taskinstance.py:2974 Task failed with exception Traceback (most recent call last): File "/opt/airflow/airflow/models/taskinstance.py", line 2550, in _run_raw_task self._execute_task_with_callbacks(context, test_mode, session=session) File "/opt/airflow/airflow/models/taskinstance.py", line 2756, in _execute_task_with_callbacks self.task.post_execute(context=context, result=result) File "/opt/airflow/airflow/lineage/__init__.py", line 85, in wrapper self.xcom_push( File "/opt/airflow/airflow/models/baseoperator.py", line 1590, in xcom_push context["ti"].xcom_push(key=key, value=value, execution_date=execution_date) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper return func(*args, session=session, **kwargs) File "/opt/airflow/airflow/models/taskinstance.py", line 3270, in xcom_push warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3) airflow.exceptions.RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Resolve deprecations in Python sensors/operators/decorators [airflow]
Taragolis opened a new pull request, #39241: URL: https://github.com/apache/airflow/pull/39241 related: https://github.com/apache/airflow/issues/38642 --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch main updated: Retry merge commit retrieval on failure (#39231)
This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 12ce2dcd46 Retry merge commit retrieval on failure (#39231) 12ce2dcd46 is described below commit 12ce2dcd46f6ad59e0a03925654a92126c2e5164 Author: Jarek Potiuk AuthorDate: Wed Apr 24 19:49:22 2024 +0200 Retry merge commit retrieval on failure (#39231) Sometimes when merge commit retrieval fails (because of race condition most likely as pull request event is no ready yet) build-images builds all images because selective check does not know the merge commit (in pull request target vuild image failure). That was the only side effect. because otherwise build image had a fallback to github.event.pull_request.head.sha in this case. With this PR, we will retry it and fallback explicitly to github.event.pull_request.head.sha if it does not work. --- .github/workflows/build-images.yml | 19 ++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index 9c732d1166..bd10e73aac 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -91,8 +91,25 @@ jobs: - name: Discover PR merge commit id: discover-pr-merge-commit run: | + # Sometimes target-commit-sha cannot be TARGET_COMMIT_SHA="$(gh api '${{ github.event.pull_request.url }}' --jq .merge_commit_sha)" - echo "TARGET_COMMIT_SHA=$TARGET_COMMIT_SHA" >> ${GITHUB_ENV} + if [[ ${TARGET_COMMIT_SHA} == "" ]]; then +# Sometimes retrieving the merge commit SHA from PR fails. We retry it once. Otherwise we +# fall-back to github.event.pull_request.head.sha +echo +echo "Could not retrieve merge commit SHA from PR, waiting for 3 seconds and retrying." +echo +sleep 3 +TARGET_COMMIT_SHA="$(gh api '${{ github.event.pull_request.url }}' --jq .merge_commit_sha)" +if [[ ${TARGET_COMMIT_SHA} == "" ]]; then + echo + echo "Could not retrieve merge commit SHA from PR, falling back to PR head SHA." + echo + TARGET_COMMIT_SHA="${{ github.event.pull_request.head.sha }}" +fi + fi + echo "TARGET_COMMIT_SHA=${TARGET_COMMIT_SHA}" + echo "TARGET_COMMIT_SHA=${TARGET_COMMIT_SHA}" >> ${GITHUB_ENV} echo "target-commit-sha=${TARGET_COMMIT_SHA}" >> ${GITHUB_OUTPUT} if: github.event_name == 'pull_request_target' # The labels in the event aren't updated when re-triggering the job, So lets hit the API to get
Re: [PR] Retry merge commit retrieval on failure [airflow]
potiuk merged PR #39231: URL: https://github.com/apache/airflow/pull/39231 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Pinecone provider support for `pinecone-client`>=3 [airflow]
eladkal commented on code in PR #37307: URL: https://github.com/apache/airflow/pull/37307#discussion_r1578266977 ## airflow/providers/pinecone/provider.yaml: ## @@ -45,7 +45,7 @@ dependencies: # Pinecone Python SDK v3.0.0 was released at 2024-01-16 and introduce some breaking changes. # It's crucial to adhere to the v3.0.0 Migration Guide before the upper-bound limitation can be removed. # https://canyon-quilt-082.notion.site/Pinecone-Python-SDK-v3-0-0-Migration-Guide-056d3897d7634bf7be399676a4757c7b Review Comment: This note can now be removed isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
eladkal commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578261459 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: ah maybe it's because the code doesn't import `running_state` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
eladkal commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578260345 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: but I am still Compat tests for 2.6 / 2.7.1 / 2.8.0 passed in the CI I think it means that we are missing coverage to the code part that invoke this function cc @potiuk am I right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
o-nikolas commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578255216 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: Ooo, I see what you mean now, good catch! I suppose we could set a min version, but instead I'll try call that new method in a try/catch and swallow the exception. The only real consequence of that is that task adoption will not work (but this is a nice to have and many executors don't support it), the code to support that is what introduced this regression in the first place anyway. So task adoption will only work in 2.10 forward, but the rest will be backwards compatible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
eladkal commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578257489 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: normally we backport by copying the needed function into the provider as private function with note to remove the workaround when min airflow version is greater than what is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Bump minimum Airflow version in providers to Airflow 2.7.0 [airflow]
eladkal opened a new pull request, #39240: URL: https://github.com/apache/airflow/pull/39240 --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch main updated: Fixed side effect of menu filtering causing disappearing menus (#39229)
This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/main by this push: new 0d2c0c5cf0 Fixed side effect of menu filtering causing disappearing menus (#39229) 0d2c0c5cf0 is described below commit 0d2c0c5cf04ef886a8211820d0dc2f4cd8c47251 Author: Jarek Potiuk AuthorDate: Wed Apr 24 19:09:09 2024 +0200 Fixed side effect of menu filtering causing disappearing menus (#39229) The default implementation of filter_permitted_menu_items had a subtle side-effect. The filtering on child items was done in-place and modified the menu itself, so it was enough to get the same worker serve requests for multiple users for the same menu to get the items removed for subsequent user - even if they had permission to see it. Deepcopying the menu items before filtering them should fix the problem Fixes: #39204 Fixes: #39135 --- airflow/auth/managers/base_auth_manager.py| 14 -- tests/auth/managers/test_base_auth_manager.py | 40 +++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/airflow/auth/managers/base_auth_manager.py b/airflow/auth/managers/base_auth_manager.py index 7bb4e92889..44fc53a66e 100644 --- a/airflow/auth/managers/base_auth_manager.py +++ b/airflow/auth/managers/base_auth_manager.py @@ -21,6 +21,7 @@ from abc import abstractmethod from functools import cached_property from typing import TYPE_CHECKING, Container, Literal, Sequence +from flask_appbuilder.menu import MenuItem from sqlalchemy import select from airflow.auth.managers.models.resource_details import ( @@ -34,7 +35,6 @@ from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: from flask import Blueprint -from flask_appbuilder.menu import MenuItem from sqlalchemy.orm import Session from airflow.auth.managers.models.base_user import BaseUser @@ -397,13 +397,21 @@ class BaseAuthManager(LoggingMixin): ) accessible_items = [] for menu_item in items: +menu_item_copy = MenuItem( +name=menu_item.name, +icon=menu_item.icon, +label=menu_item.label, +childs=[], +baseview=menu_item.baseview, +cond=menu_item.cond, +) if menu_item.childs: accessible_children = [] for child in menu_item.childs: if self.security_manager.has_access(ACTION_CAN_ACCESS_MENU, child.name): accessible_children.append(child) -menu_item.childs = accessible_children -accessible_items.append(menu_item) +menu_item_copy.childs = accessible_children +accessible_items.append(menu_item_copy) return accessible_items @abstractmethod diff --git a/tests/auth/managers/test_base_auth_manager.py b/tests/auth/managers/test_base_auth_manager.py index 64d33f6065..a39b60787c 100644 --- a/tests/auth/managers/test_base_auth_manager.py +++ b/tests/auth/managers/test_base_auth_manager.py @@ -313,3 +313,43 @@ class TestBaseAuthManager: assert result[1].name == "item3" assert len(result[1].childs) == 1 assert result[1].childs[0].name == "item3.1" + +@patch.object(EmptyAuthManager, "security_manager") +def test_filter_permitted_menu_items_twice(self, mock_security_manager, auth_manager): +mock_security_manager.has_access.side_effect = [ +# 1st call +True, # menu 1 +False, # menu 2 +True, # menu 3 +True, # Item 3.1 +False, # Item 3.2 +# 2nd call +False, # menu 1 +True, # menu 2 +True, # menu 3 +False, # Item 3.1 +True, # Item 3.2 +] + +menu = Menu() +menu.add_link("item1") +menu.add_link("item2") +menu.add_link("item3") +menu.add_link("item3.1", category="item3") +menu.add_link("item3.2", category="item3") + +result = auth_manager.filter_permitted_menu_items(menu.get_list()) + +assert len(result) == 2 +assert result[0].name == "item1" +assert result[1].name == "item3" +assert len(result[1].childs) == 1 +assert result[1].childs[0].name == "item3.1" + +result = auth_manager.filter_permitted_menu_items(menu.get_list()) + +assert len(result) == 2 +assert result[0].name == "item2" +assert result[1].name == "item3" +assert len(result[1].childs) == 1 +assert result[1].childs[0].name == "item3.2"
Re: [I] Pools have disappeared from the Admin menu [airflow]
potiuk closed issue #39204: Pools have disappeared from the Admin menu URL: https://github.com/apache/airflow/issues/39204 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] front - admin menu - drop-down non deterministic [airflow]
potiuk closed issue #39135: front - admin menu - drop-down non deterministic URL: https://github.com/apache/airflow/issues/39135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fixed side effect of menu filtering causing disappearing menus [airflow]
potiuk merged PR #39229: URL: https://github.com/apache/airflow/pull/39229 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]
kahlstrm commented on code in PR #39177: URL: https://github.com/apache/airflow/pull/39177#discussion_r1578227079 ## airflow/utils/log/file_task_handler.py: ## @@ -384,7 +380,13 @@ def _read( worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) -if is_running and not executor_messages: +if ( +ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) +and not executor_messages +and not remote_logs Review Comment: Exactly and this is what we encountered in our environment after upgrading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
eladkal commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578223632 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: but what will happen if someone install newer version of the provider with Airflow 2.8.x / 2.9.0? the call for `self.running_state` would not work doesn't it? It's quite surprising to me that back compact test passed, I may missing something here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
eladkal commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578223632 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: but what will happen if someone install newer version of the provider with Airflow 2.8.x / 2.9.0? the call for `self.running_state` would not work doesn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fixed side effect of menu filtering causing disappearing menus [airflow]
potiuk commented on PR #39229: URL: https://github.com/apache/airflow/pull/39229#issuecomment-2075405977 I had to make it a bit more complex, because of infnite recursion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch main updated (26e5b9ffe0 -> 6db6fef357)
This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 26e5b9ffe0 Resolve deprecations in `tests/utils/` (#39228) add 6db6fef357 Use grid view for Task Instance's `log_url` (#39183) No new revisions were added by this update. Summary of changes: airflow/models/taskinstance.py | 10 ++ tests/models/test_taskinstance.py | 9 + tests/providers/smtp/notifications/test_smtp.py | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-)
Re: [PR] Use grid view for Task Instance's `log_url` [airflow]
bbovenzi merged PR #39183: URL: https://github.com/apache/airflow/pull/39183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]
RNHTTR commented on code in PR #39177: URL: https://github.com/apache/airflow/pull/39177#discussion_r1578209347 ## airflow/utils/log/file_task_handler.py: ## @@ -384,7 +380,13 @@ def _read( worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) -if is_running and not executor_messages: +if ( +ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) +and not executor_messages +and not remote_logs Review Comment: I see. So, when clearing a _previous_ task instance try, if you don't have remote logs configured, you won't be able to see this try's task instance logs until the task completes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]
RNHTTR commented on code in PR #39177: URL: https://github.com/apache/airflow/pull/39177#discussion_r1578196224 ## airflow/utils/log/file_task_handler.py: ## @@ -404,11 +406,15 @@ def _read( ) log_pos = len(logs) messages = "".join([f"*** {x}\n" for x in messages_list]) +end_of_log = ti.try_number != try_number or ti.state not in ( Review Comment: Nitpick: Since you check whether `ti.state` is or isn't in `(TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)` twice, would it make sense to replace `is_running` with `is_in_running_or_deferred` and use that when checking for `executor_messages`/`remote_logs` and when defining `end_of_log`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
o-nikolas commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1578193059 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: It still functioned _mostly_ correctly. The running and queued task sets are maintained mostly by the base executor and used for emitting metrics and to calculate open slots. I think the regression is only present in airflow 2.9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Pinecone provider support for `pinecone-client`>=3 [airflow]
rawwar commented on PR #37307: URL: https://github.com/apache/airflow/pull/37307#issuecomment-2075351057 @sunank200 , @Lee-W, Do you have any more feedback for the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]
dstandish commented on PR #38876: URL: https://github.com/apache/airflow/pull/38876#issuecomment-2075334734 Should we consider yanking 2.9.0 once 2.9.1 is out with this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(airflow) branch main updated (14b631294d -> 26e5b9ffe0)
This is an automated email from the ASF dual-hosted git repository. taragolis pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git from 14b631294d Fix: Only quote the keys of the query_parameters in MSGraphOperator (#39207) add 26e5b9ffe0 Resolve deprecations in `tests/utils/` (#39228) No new revisions were added by this update. Summary of changes: tests/deprecations_ignore.yml | 38 -- tests/utils/log/test_log_reader.py | 1 - tests/utils/test_dates.py | 8 +++-- tests/utils/test_db_cleanup.py | 4 +++ tests/utils/test_email.py | 34 --- tests/utils/test_log_handlers.py | 12 +++ tests/utils/test_sqlalchemy.py | 2 ++ tests/utils/test_state.py | 1 + .../test_task_handler_with_custom_formatter.py | 7 +++- tests/utils/test_types.py | 2 ++ 10 files changed, 56 insertions(+), 53 deletions(-)
Re: [PR] Resolve deprecations in `tests/utils/` [airflow]
Taragolis merged PR #39228: URL: https://github.com/apache/airflow/pull/39228 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] KubernetesPodOperator fails due to getting logs [airflow]
antoniocorralsierra opened a new issue, #39239: URL: https://github.com/apache/airflow/issues/39239 ### Apache Airflow Provider(s) cncf-kubernetes ### Versions of Apache Airflow Providers apache-airflow-providers-cncf-kubernetes 8.0.1 apache-airflow-providers-celery 3.6.1 ### Apache Airflow version 2.8.3 ### Operating System Debian GNU/Linux 12 (bookworm) ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### What happened I have an Airflow instance deploy on GKE cluster with helm (keda is activate on workers to scale from 1 to 5 with a cooldownPeriod set to 240). I use CeleryKubernetesExecutor and I run the tasks with KubernetesPodOperator on the celery workers. To not have an active task on celery worker while KubernetesPodOperator running I activate deferrable mode on them. I set poll_interval to 10 seconds. After that many errors with 404 status code appears. Situation: `example_task = KubernetesPodOperator(task_id="example_task", name="example_task", get_logs=True, on_finish_action="delete_pod", log_events_on_failure=True, deferrable=True, poll_interval=10, logging_interval=None)` We can suppose that the rest of input params are correct. The steps are: 1. The celery worker get the task. 2. Launch the KubernetesPodOperators that create a new pod on the cluster that execute the task. 3. The status of the task change to deferred. 4. Checking the triggerer log I see that task is complete: `[2024-04-24T12:48:57.740+] {triggerer_job_runner.py:623} INFO - trigger example_task (ID 668324) completed` 5. Check the status of the pod, it is completed (success) and finished at [2024-04-24 12:48:51.122]. 6. Task change to queue status, waiting for a gap on celery worker to run. 7. When a gap is avalible on celery worker the task begin to run. 8. After running, the task finish with error and change to up_for_retry status. The task logs is: `[2024-04-24, 12:51:21 UTC] {taskinstance.py:2731} ERROR - Task failed with exception` `Traceback (most recent call last):` ` File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task` `result = _execute_callable(context=context, **execute_callable_kwargs)` ` File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable` `return execute_callable(context=context, **execute_callable_kwargs)` ` File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1604, in resume_execution` `return execute_callable(context)` ` File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 738, in trigger_reentry` `self.write_logs(self.pod)` ` File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 771, in write_logs` `logs = self.pod_manager.read_pod_logs(` ` File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 324, in wrapped_f` `return self(f, *args, **kw)` ` File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 404, in __call__` `do = self.iter(retry_state=retry_state)` ` File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 360, in iter` `raise retry_exc.reraise()` ` File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 193, in reraise` `raise self.last_attempt.result()` ` File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result` `return self.__get_result()` ` File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result` `raise self._exception` ` File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 407, in __call__` `result = fn(*args, **kwargs)` ` File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 668, in read_pod_logs` `logs = self._client.read_namespaced_pod_log(` ` File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 23957, in read_namespaced_pod_log` `return self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs) # noqa: E501` ` File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 24076, in read_namespaced_pod_log_with_http_info` `return self.api_client.call_api(` ` File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api` `return self.__call_api(resource_path, method,`
[PR] Remove unnecessary validation from cncf provider. [airflow]
VShkaberda opened a new pull request, #39238: URL: https://github.com/apache/airflow/pull/39238 Remove check for `initialExecutors` (if `dynamicAllocation` is enabled) as unnecessary. The [Spark documentation](https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation) states that default value for `spark.dynamicAllocation.initialExecutors` is `spark.dynamicAllocation.minExecutors`. There is no need to pass `initialExecutors` explicitly. --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove unnecessary validation from cncf provider. [airflow]
boring-cyborg[bot] commented on PR #39238: URL: https://github.com/apache/airflow/pull/39238#issuecomment-2075282097 Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points: - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-pre-commit-hooks) will help you with that. - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it. - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations. - Be patient and persistent. It might take some time to get a review or get the final approval from Committers. - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack. - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices). - Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better . In case of doubts contact the developers at: Mailing List: d...@airflow.apache.org Slack: https://s.apache.org/airflow-slack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]
sunank200 commented on code in PR #39230: URL: https://github.com/apache/airflow/pull/39230#discussion_r1578107932 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( Review Comment: Yes i tested this ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( Review Comment: I have added the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]
sunank200 commented on code in PR #39230: URL: https://github.com/apache/airflow/pull/39230#discussion_r1578105965 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( +job_id=self.job_id, project_id=self.project_id, region=self.region +) Review Comment: Yes, I have tested this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]
sunank200 commented on code in PR #39230: URL: https://github.com/apache/airflow/pull/39230#discussion_r1578100315 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( +job_id=self.job_id, project_id=self.project_id, region=self.region +) Review Comment: Yes. I tested this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Grid button to Task Instance view [airflow]
bbovenzi commented on code in PR #39223: URL: https://github.com/apache/airflow/pull/39223#discussion_r1578104205 ## airflow/www/templates/airflow/task_instance.html: ## @@ -39,6 +39,9 @@ {{ url_for(endpoint, dag_id=dag.dag_id, task_id=task_id, execution_date=execution_date) }} {%- endif -%} {% endmacro -%} + Review Comment: Nice! Could we include the task_id, dag_run_id, map_index in the params to link back to the same task instance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]
sunank200 commented on code in PR #39230: URL: https://github.com/apache/airflow/pull/39230#discussion_r1578101930 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( Review Comment: Yes. I tested this ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( Review Comment: Yes i tested this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]
sunank200 commented on code in PR #39230: URL: https://github.com/apache/airflow/pull/39230#discussion_r1578101930 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( Review Comment: Yes. I tested this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]
sunank200 commented on code in PR #39230: URL: https://github.com/apache/airflow/pull/39230#discussion_r1578100315 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -91,20 +93,28 @@ def serialize(self): "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"cancel_on_kill": self.cancel_on_kill, }, ) async def run(self): -while True: -job = await self.get_async_hook().get_job( -project_id=self.project_id, region=self.region, job_id=self.job_id -) -state = job.status.state -self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) -if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): -break -await asyncio.sleep(self.polling_interval_seconds) -yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +try: +while True: +job = await self.get_async_hook().get_job( +project_id=self.project_id, region=self.region, job_id=self.job_id +) +state = job.status.state +self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) +if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): +break +await asyncio.sleep(self.polling_interval_seconds) +yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) +except asyncio.CancelledError: +self.log.info("Task got cancelled.") +if self.job_id and self.cancel_on_kill: +await self.get_async_hook().cancel_job( +job_id=self.job_id, project_id=self.project_id, region=self.region +) Review Comment: Yes. I tested this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] KubernetesPodOperator duplicating logs when interrupted [airflow]
raphaelauv commented on issue #39236: URL: https://github.com/apache/airflow/issues/39236#issuecomment-2075176634 could you try the latest version 8.1.1 of `apache-airflow-providers-cncf-kubernetes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
sunank200 commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1578042750 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.DELETING, Review Comment: I have changed it to `ClusterStatus.State.DELETING` ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.DELETING, Review Comment: I have changed it to `ClusterStatus.State.DELETING` and this is correct state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use grid view for Task Instance's `log_url` [airflow]
AetherUnbound commented on PR #39183: URL: https://github.com/apache/airflow/pull/39183#issuecomment-2075151250 @bbovenzi there's one test that's failing but it looks like it might be a flakey one, would you mind re-running it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
sunank200 commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1578038004 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.ERROR, Review Comment: ClusterStatus.State.DELETING is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
sunank200 commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1578035001 ## tests/providers/google/cloud/triggers/test_dataproc.py: ## @@ -215,9 +228,48 @@ async def test_cluster_run_loop_is_still_running( await asyncio.sleep(0.5) assert not task.done() -assert f"Current state is: {ClusterStatus.State.CREATING}" +assert f"Current state is: {ClusterStatus.State.CREATING}." assert f"Sleeping for {TEST_POLL_INTERVAL} seconds." +@pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster") +async def test_fetch_cluster_status(self, mock_get_cluster, cluster_trigger, async_get_cluster): +mock_get_cluster.return_value = async_get_cluster( +status=ClusterStatus(state=ClusterStatus.State.RUNNING) +) +cluster = await cluster_trigger.fetch_cluster() + +assert cluster.status.state == ClusterStatus.State.RUNNING, "The cluster state should be RUNNING" + +@pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.delete_cluster") +async def test_delete_when_error_occurred(self, mock_delete_cluster, cluster_trigger): +mock_cluster = mock.MagicMock(spec=Cluster) +type(mock_cluster).status = mock.PropertyMock( +return_value=mock.MagicMock(state=ClusterStatus.State.ERROR) +) + +mock_delete_future = asyncio.Future() +mock_delete_future.set_result(None) +mock_delete_cluster.return_value = mock_delete_future + +cluster_trigger.delete_on_error = True + +await cluster_trigger.delete_when_error_occurred(mock_cluster) + +mock_delete_cluster.assert_called_once_with( +region=cluster_trigger.region, +cluster_name=cluster_trigger.cluster_name, +project_id=cluster_trigger.project_id, +) + +mock_delete_cluster.reset_mock() +cluster_trigger.delete_on_error = False + +await cluster_trigger.delete_when_error_occurred(mock_cluster) + +mock_delete_cluster.assert_not_called() + Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Moved low-level logic to call endpoint from trigger to run method hook MSGraph [airflow]
dabla opened a new pull request, #39237: URL: https://github.com/apache/airflow/pull/39237 Moved the logic which calls the MSGraph endpoint from MSGraphTrigger to the run-method of the KiotaRequestAdapterHook. That way we have the same approach as with the HttpHook and HttpOperator as in the future the dream would be that both could become one. This is also in line with the wanted approach in Airflow to have less operators (e.g. thus more generic one) which can use interchangeable hooks to achieve the same goal but with less operators (and thus more specialized hooks). --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] KubernetesPodOperator duplicating logs when interrupted [airflow]
boring-cyborg[bot] commented on issue #39236: URL: https://github.com/apache/airflow/issues/39236#issuecomment-2075062319 Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] KubernetesPodOperator duplicating logs when interrupted [airflow]
Nikita-Sobolev opened a new issue, #39236: URL: https://github.com/apache/airflow/issues/39236 ### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.8.1 ### What happened? The KubernetesPodOperator is duplicating tasks's logs two times when `log read interrupted but container base still running` they are interrupted. Happens randomly on different dags and different runs of the same dag. Assume it is somehow connected to the https://github.com/apache/airflow/issues/35019 ### What you think should happen instead? no logs duplicate ### How to reproduce KubernetesPodOperator on cloud AKS cluster ### Operating System Ubuntu 22.04 ### Versions of Apache Airflow Providers apache-airflow==2.8.1 apache-airflow-providers-amazon==8.16.0 apache-airflow-providers-celery==3.5.1 apache-airflow-providers-cncf-kubernetes==7.13.0 apache-airflow-providers-common-io==1.2.0 apache-airflow-providers-common-sql==1.10.0 apache-airflow-providers-docker==3.9.1 apache-airflow-providers-elasticsearch==5.3.1 apache-airflow-providers-ftp==3.7.0 apache-airflow-providers-google==10.13.1 apache-airflow-providers-grpc==3.4.1 apache-airflow-providers-hashicorp==3.6.1 apache-airflow-providers-http==4.8.0 apache-airflow-providers-imap==3.5.0 apache-airflow-providers-microsoft-azure==8.5.1 apache-airflow-providers-mysql==5.5.1 apache-airflow-providers-odbc==4.4.0 apache-airflow-providers-openlineage==1.4.0 apache-airflow-providers-postgres==5.10.0 apache-airflow-providers-redis==3.6.0 apache-airflow-providers-sendgrid==3.4.0 apache-airflow-providers-sftp==4.8.1 apache-airflow-providers-slack==8.5.1 apache-airflow-providers-snowflake==5.2.1 apache-airflow-providers-sqlite==3.7.0 apache-airflow-providers-ssh==3.10.0 google-cloud-orchestration-airflow==1.10.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? ![Untitled](https://github.com/apache/airflow/assets/59029283/f558cb03-75c0-4b25-ac8a-ff9f2945ece5) ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] openlineage: use `ProcessPoolExecutor` over `ThreadPoolExecutor`. [airflow]
JDarDagran opened a new pull request, #39235: URL: https://github.com/apache/airflow/pull/39235 --- This may possibly fix #39232. I don't really know how to write tests for the change (load test is the thing but do we do this in regular tests?). Some guidance would be very much helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Bugfix yaml parsing for GKEStartKueueInsideClusterOperator [airflow]
moiseenkov opened a new pull request, #39234: URL: https://github.com/apache/airflow/pull/39234 Bugfix yaml parsing for `GKEStartKueueInsideClusterOperator` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
sunank200 commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1577939404 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.ERROR, Review Comment: I think a better idea would just be to return the state as it is as it is being fetched from the cluster. And here if you look at code above its `state = cluster.status.state` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix batching for BigQueryToPostgresOperator [airflow]
moiseenkov opened a new pull request, #39233: URL: https://github.com/apache/airflow/pull/39233 Fixed batching for the following operators: - BigQueryToPostgresOperator - BigQueryToMsSqlOperator - BigQueryToMySqlOperator Previously, if users specify the `batch_size` parameter it is used only for reading from the BigQuery, however, the writing to the database is performed with the default batch size of 1000 rows. This PR fixes that and the specified `batch_size` now used for both reading from BigQuery and writing to a database. Additionally, updated the system test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] openlineage, celery: scheduler hanging when emitting lots of OL events via HTTP [airflow]
JDarDagran opened a new issue, #39232: URL: https://github.com/apache/airflow/issues/39232 ### Apache Airflow Provider(s) celery, openlineage ### Versions of Apache Airflow Providers _No response_ ### Apache Airflow version 2.9.0 but happens on 2.7+ too ### Operating System Darwin MacBook-Pro.local 23.4.0 Darwin Kernel Version 23.4.0: Fri Mar 15 00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64 ### Deployment Other ### Deployment details The issue can be reproduced in all environments, both in local with breeze and cloud deployment, e.g. Astro Cloud. ### What happened OpenLineage listener hooks on DagRun state changes via `on_dag_run_running/failed/success`. When OL events are emitted via HTTP in large scale the scheduler hangs and needs restart. **The issue appears to be happening only with `CeleryExecutor`.** This couldn't be reproduced when disabling OpenLineage (with [openlineage] disabled = True) or with any other OpenLineage transport that doesn't use HTTP. I also experimented with using raw `urllib3` or `httpx` as alternative to `requests`. All of the experiments produced the same bug resulting in Scheduler hanging. ### What you think should happen instead When reproducing with local breeze setup with CeleryExecutor there’s this strange behaviour: ```htop```: ![image](https://github.com/apache/airflow/assets/3889552/9e006e71-cf50-42b9-bff5-0e2e72b080fa) ```lsof | grep CLOSE_WAIT```: ![image](https://github.com/apache/airflow/assets/3889552/03087fbf-9744-4301-905a-226b0f721a97) Stack from main loop of scheduler: ```bash Traceback for thread 152 (airflow) [] (most recent call last): (Python) File "/usr/local/bin/airflow", line 8, in sys.exit(main()) (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main args.func(args) (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) (Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler run_command_with_daemon_option( (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option callback() (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in callback=lambda: _run_scheduler_job(args), (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job run_job(job=job_runner.job, execute_callable=job_runner._execute) (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper return func(*args, session=session, **kwargs) (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job return execute_job(job, execute_callable=execute_callable) (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job ret = execute_callable() (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute self._run_scheduler_loop() (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop self.job.executor.heartbeat() (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat self.trigger_tasks(open_slots) (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks self._process_tasks(task_tuples) (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send) (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 342, in _send_tasks_to_celery key_and_async_results = list( (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 484, in _chain_from_iterable_of_lists for element in iterable: (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator yield fs.pop().result() (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result self._condition.wait(timeout) (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait waiter.acquire() ``` Stack from one of the child spawned scheduler processes ```bash Traceback
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
pankajkoti commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1577943509 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.ERROR, Review Comment: But that would be an inaccurate state no? Since we triggered delete action after that state? Might be good to get the latest state or use ClusterStatus.State.DELETING since we know that that would be cluster state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
pankajkoti commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1577943509 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.ERROR, Review Comment: But that would be an inaccurate state no? Since we triggered delete action after that state? Might be good to get the latest state or use ClusterStatus.State.DELETING -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
sunank200 commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1577939404 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" +try: +while True: +cluster = await self.fetch_cluster() +state = cluster.status.state +if state == ClusterStatus.State.ERROR: +await self.delete_when_error_occurred(cluster) +yield TriggerEvent( +{ +"cluster_name": self.cluster_name, +"cluster_state": state.ERROR, Review Comment: I think a better idea would just be to return the state as it is as it is being fetched from the cluster -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]
sunank200 commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1577936986 ## airflow/providers/google/cloud/triggers/dataproc.py: ## @@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, +"delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: -while True: -cluster = await self.get_async_hook().get_cluster( -project_id=self.project_id, region=self.region, cluster_name=self.cluster_name +"""Run the trigger.""" Review Comment: Removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]
eladkal commented on code in PR #39212: URL: https://github.com/apache/airflow/pull/39212#discussion_r1577932047 ## airflow/providers/amazon/aws/executors/ecs/ecs_executor.py: ## @@ -400,7 +400,7 @@ def attempt_task_runs(self): else: task = run_task_response["tasks"][0] self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) -self.queued(task_key, task.task_arn) +self.running_state(task_key, task.task_arn) Review Comment: Does this means that the executor wont work for airflow<2.10 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org