(airflow) branch main updated (6781c632e3 -> d08f893f25)

2024-04-24 Thread pankaj
This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 6781c632e3 doc: dynamictaskmapping pythonoperator op_kwargs (#39242)
 add d08f893f25 Expose AWS IAM missing param in Hashicorp secret (#38536)

No new revisions were added by this update.

Summary of changes:
 .../hashicorp/_internal_client/vault_client.py | 40 +-
 airflow/providers/hashicorp/provider.yaml  |  6 
 airflow/providers/hashicorp/secrets/vault.py   |  3 ++
 .../secrets-backends/hashicorp-vault.rst   | 12 +++
 4 files changed, 53 insertions(+), 8 deletions(-)



Re: [PR] Expose AWS IAM missing param in Hashicorp secret [airflow]

2024-04-24 Thread via GitHub


pankajastro merged PR #38536:
URL: https://github.com/apache/airflow/pull/38536


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger kwarg encryption migration [airflow]

2024-04-24 Thread via GitHub


jedcunningham commented on code in PR #39246:
URL: https://github.com/apache/airflow/pull/39246#discussion_r1578894900


##
airflow/migrations/versions/0140_2_9_0_update_trigger_kwargs_type.py:
##
@@ -38,13 +41,43 @@
 airflow_version = "2.9.0"
 
 
+def get_session() -> sa.orm.Session:
+conn = op.get_bind()
+sessionmaker = sa.orm.sessionmaker()
+return sessionmaker(bind=conn)
+
 def upgrade():
-"""Update trigger kwargs type to string"""
+"""Update trigger kwargs type to string and encrypt"""
 with op.batch_alter_table("trigger") as batch_op:
 batch_op.alter_column("kwargs", type_=sa.Text(), )
 
+if not context.is_offline_mode():
+session = get_session()
+try:
+for trigger in session.query(Trigger):
+trigger.kwargs = trigger.kwargs
+session.commit()
+finally:
+session.close()
+
 
 def downgrade():
-"""Unapply update trigger kwargs type to string"""
+"""Unapply update trigger kwargs type to string and encrypt"""
+if context.is_offline_mode():
+print(dedent("""
+
+--  WARNING: Unable to decrypt trigger kwargs automatically in offline 
mode!

Review Comment:
   I don't think we can do any better than spitting out a warning :(.
   
   But if folks have ideas here, happy to hear them.



##
airflow/models/trigger.py:
##
@@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, 
Any]:
 from airflow.models.crypto import get_fernet
 from airflow.serialization.serialized_objects import BaseSerialization
 
-decrypted_kwargs = 
json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8"))
+# We weren't able to encrypt the kwargs in all migration paths,
+# so we need to handle the case where they are not encrypted.
+# Triggers aren't long lasting, so we can skip encrypting them now.

Review Comment:
   We could start doing this if we'd like to, but I'd suggest we do it in a 
follow up PR.



##
tests/models/test_trigger.py:
##
@@ -378,3 +380,19 @@ def test_serialize_sensitive_kwargs():
 assert isinstance(trigger_row.encrypted_kwargs, str)
 assert "value1" not in trigger_row.encrypted_kwargs
 assert "value2" not in trigger_row.encrypted_kwargs
+
+
+def test_kwargs_not_encrypted():
+"""
+Tests that we don't decrypt kwargs if they aren't encrypted.
+We weren't able to encrypt the kwargs in all migration paths.
+"""
+trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", 
kwargs={})
+# force the `encrypted_kwargs` to be unencrypted, like they would be after 
an upgrade
+trigger.encrypted_kwargs = json.dumps(
+BaseSerialization.serialize({"param1": "value1", "param2": "value2"})
+)
+print(trigger.encrypted_kwargs)

Review Comment:
   ```suggestion
   ```
   
   Nothing to see here 臘



##
airflow/models/trigger.py:
##
@@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, 
Any]:
 from airflow.models.crypto import get_fernet
 from airflow.serialization.serialized_objects import BaseSerialization
 
-decrypted_kwargs = 
json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8"))
+# We weren't able to encrypt the kwargs in all migration paths,
+# so we need to handle the case where they are not encrypted.
+# Triggers aren't long lasting, so we can skip encrypting them now.
+if encrypted_kwargs.startswith("{"):
+decrypted_kwargs = json.loads(encrypted_kwargs)

Review Comment:
   This fixes the offline upgrade path by detecting the kwargs aren't actually 
encrypted.



##
tests/models/test_trigger.py:
##
@@ -378,3 +380,19 @@ def test_serialize_sensitive_kwargs():
 assert isinstance(trigger_row.encrypted_kwargs, str)
 assert "value1" not in trigger_row.encrypted_kwargs
 assert "value2" not in trigger_row.encrypted_kwargs
+
+
+def test_kwargs_not_encrypted():
+"""
+Tests that we don't decrypt kwargs if they aren't encrypted.
+We weren't able to encrypt the kwargs in all migration paths.
+"""
+trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", 
kwargs={})
+# force the `encrypted_kwargs` to be unencrypted, like they would be after 
an upgrade

Review Comment:
   ```suggestion
   # force the `encrypted_kwargs` to be unencrypted, like they would be 
after an offline upgrade
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


jedcunningham commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076404146

   I've opened #39246 as an alternative solution to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix trigger kwarg encryption migration [airflow]

2024-04-24 Thread via GitHub


jedcunningham opened a new pull request, #39246:
URL: https://github.com/apache/airflow/pull/39246

   Closes: #38836 
   Alternative to #38876 
   
   Do the encryption in the migration itself, and fix support for offline 
migrations as well.
   
   The offline up migration won't actually encrypt the trigger kwargs as there 
isn't a safe way to accomplish that, so the decryption processes checks and 
short circuits if it isn't encrypted.
   
   The offline down migration will now print out a warning that the offline 
migration will fail if there are any running triggers. I think this is the best 
we can do for that scenario (and folks willing to do offline migrations will 
hopefully be able to understand the situation).
   
   This also solves the "encrypting the already encrypted kwargs" bug in 2.9.0.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FEAT] added notebook error in databricks deferrable handler [airflow]

2024-04-24 Thread via GitHub


Lee-W commented on PR #39110:
URL: https://github.com/apache/airflow/pull/39110#issuecomment-2076387313

   I think we're good to merge this one. Just want to see whether @pankajkoti 
have some time to double check if I missed anything. I'm going to merge this 
later today. Please let me know if anyone want to take a deeper look.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FEAT] added notebook error in databricks deferrable handler [airflow]

2024-04-24 Thread via GitHub


gaurav7261 commented on PR #39110:
URL: https://github.com/apache/airflow/pull/39110#issuecomment-2076349163

   Hi @pankajkoti , can you please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch main updated: doc: dynamictaskmapping pythonoperator op_kwargs (#39242)

2024-04-24 Thread uranusjr
This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 6781c632e3 doc: dynamictaskmapping pythonoperator op_kwargs (#39242)
6781c632e3 is described below

commit 6781c632e3ba7fd4404f21c8b932e1374a4a9322
Author: raphaelauv 
AuthorDate: Thu Apr 25 06:40:06 2024 +0200

doc: dynamictaskmapping pythonoperator op_kwargs (#39242)

Co-authored-by: raphaelauv 
---
 .../authoring-and-scheduling/dynamic-task-mapping.rst  | 18 ++
 1 file changed, 18 insertions(+)

diff --git 
a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst 
b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
index acf9cfba3d..739bc6fee9 100644
--- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
@@ -291,6 +291,24 @@ Sometimes an upstream needs to specify multiple arguments 
to a downstream operat
 
 This produces two task instances at run-time printing ``1`` and ``2`` 
respectively.
 
+Also it's possible to mix ``expand_kwargs`` with most of the operators 
arguments like the ``op_kwargs`` of the PythonOperator
+
+.. code-block:: python
+
+def print_args(x, y):
+print(x)
+print(y)
+return x + y
+
+
+PythonOperator.partial(task_id="task-1", 
python_callable=print_args).expand_kwargs(
+[
+{"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
+{"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": 
False},
+]
+)
+
+
 Similar to ``expand``, you can also map against a XCom that returns a list of 
dicts, or a list of XComs each returning a dict. Re-using the S3 example above, 
you can use a mapped task to perform "branching" and copy files to different 
buckets:
 
 .. code-block:: python



Re: [PR] doc: dynamictaskmapping pythonoperator op_kwargs [airflow]

2024-04-24 Thread via GitHub


uranusjr merged PR #39242:
URL: https://github.com/apache/airflow/pull/39242


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Resolve deprecations in Python sensors/operators/decorators [airflow]

2024-04-24 Thread via GitHub


uranusjr commented on code in PR #39241:
URL: https://github.com/apache/airflow/pull/39241#discussion_r1578832517


##
tests/deprecations_ignore.yml:
##
@@ -349,23 +313,8 @@
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_key_type
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_no_dict
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_manual_multiple_outputs_false_with_typings
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_empty_dict
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_ignore_typing
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_return_none
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_arguments_are_templatized
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_keyword_arguments_are_templatized
-- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_xcom_arg
+# Inlets/Outlets use TaskInstance.xcom_push() with deprecated arguments in 
airflow/lineage/__init__.py

Review Comment:
   Why not fix these then? They seem pretty trivial.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add retry logic for KubernetesCreateResourceOperator and KubernetesJobOperator [airflow]

2024-04-24 Thread via GitHub


dirrao commented on code in PR #39201:
URL: https://github.com/apache/airflow/pull/39201#discussion_r1578826472


##
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##
@@ -486,6 +488,12 @@ def get_deployment_status(
 except Exception as exc:
 raise exc
 
+@tenacity.retry(

Review Comment:
   This will be retried for non transient errors as well.  can't we rely on the 
task retry instead of explicit retry here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: sqa deprecations for airflow core [airflow]

2024-04-24 Thread via GitHub


uranusjr commented on code in PR #39211:
URL: https://github.com/apache/airflow/pull/39211#discussion_r1578826032


##
airflow/www/views.py:
##
@@ -3294,7 +3292,7 @@ def next_run_datasets(self, dag_id):
 latest_run = dag_model.get_last_dagrun(session=session)
 
 events = [
-dict(info)
+dict(info._mapping)

Review Comment:
   I assume this attribute is officially supported although it looks private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]

2024-04-24 Thread via GitHub


dirrao commented on code in PR #39177:
URL: https://github.com/apache/airflow/pull/39177#discussion_r1578824340


##
airflow/utils/log/file_task_handler.py:
##
@@ -366,10 +366,7 @@ def _read(
 executor_messages: list[str] = []
 executor_logs: list[str] = []
 served_logs: list[str] = []
-is_running = ti.try_number == try_number and ti.state in (
-TaskInstanceState.RUNNING,
-TaskInstanceState.DEFERRED,

Review Comment:
   when the task is deferred, Does the logs are served from triggerer? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


jedcunningham commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076305345

   Currently downgrade also doesn't work when you have triggers - alembic tries 
to switch the column back to json before decrypting happens.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] added a new condition before launching the self._scan_stale_dags() fu… [airflow]

2024-04-24 Thread via GitHub


uranusjr commented on code in PR #39159:
URL: https://github.com/apache/airflow/pull/39159#discussion_r1578810984


##
docs/apache-airflow/core-concepts/dags.rst:
##
@@ -924,3 +924,10 @@ if it fails for ``N`` number of times consecutively.
 we can also provide and override these configuration from DAG argument:
 
 - ``max_consecutive_failed_dag_runs``: Overrides 
:ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
+
+Disable deletion of stale dags

Review Comment:
   ```suggestion
   Disable deletion of stale DAGs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] added a new condition before launching the self._scan_stale_dags() fu… [airflow]

2024-04-24 Thread via GitHub


uranusjr commented on code in PR #39159:
URL: https://github.com/apache/airflow/pull/39159#discussion_r1578810901


##
airflow/dag_processing/manager.py:
##
@@ -599,7 +599,10 @@ def _run_parsing_loop(self):
 
 if self.standalone_dag_processor:
 self._fetch_callbacks(max_callbacks_per_loop)
-self._scan_stale_dags()
+
+# this variable gives us flexibility to purge stale dags or not.

Review Comment:
   This is obvious from actual logic; the comment is quite redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

2024-04-24 Thread via GitHub


uranusjr commented on code in PR #39244:
URL: https://github.com/apache/airflow/pull/39244#discussion_r1578810106


##
airflow/cli/commands/task_command.py:
##
@@ -199,6 +199,8 @@ def _get_ti_db_access(
 )
 # TODO: Validate map_index is in range?
 ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index)
+if create_if_necessary == "db":
+session.add(ti)

Review Comment:
   Would this not just delay this until when we need to handle 
`create_if_necessary == "memory"`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Amazon Bedrock - Knowledge Bases and Data Sources [airflow]

2024-04-24 Thread via GitHub


ferruzzi opened a new pull request, #39245:
URL: https://github.com/apache/airflow/pull/39245

   Adds hooks, operators, sensors, triggers, and waiters to enable support for 
Amazon Bedrock Knowledge Bases.  This includes Bedrock Data Sources and 
Ingestion Jobs, and some light additions (a thin hook, and a 
waiter/sensor/trigger set) for Amazon OpenSearch Serverless which are needed to 
make the system test work end-to-end.
   
   System test and unit tests are included, as well as docs.  Static checks, 
unit tests, and build-docs all pass locally.
   
   
   
   
   
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add tests for EmrServerlessJobSensor [airflow]

2024-04-24 Thread via GitHub


mateuslatrova commented on PR #39099:
URL: https://github.com/apache/airflow/pull/39099#issuecomment-2076129433

   Hi @dirrao !
   
   Just fixed the tests for EmrServelessJobSensor, whenever you can take a look.
   
   I decided to do a separate PR with the tests for the 
EmrServerlessApplicationSensor.
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Added JOB_STATE_CANCELLED and pool_sleep GCP Dataflow Operators [airflow]

2024-04-24 Thread via GitHub


github-actions[bot] closed pull request #37364: Added JOB_STATE_CANCELLED and 
pool_sleep GCP Dataflow Operators
URL: https://github.com/apache/airflow/pull/37364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] feat/log null bytes file [airflow]

2024-04-24 Thread via GitHub


github-actions[bot] closed pull request #37894: feat/log null bytes file
URL: https://github.com/apache/airflow/pull/37894


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Raising Exception if dag is not in active state while triggering dag [airflow]

2024-04-24 Thread via GitHub


github-actions[bot] commented on PR #37836:
URL: https://github.com/apache/airflow/pull/37836#issuecomment-2076076835

   This pull request has been automatically marked as stale because it has not 
had recent activity. It will be closed in 5 days if no further activity occurs. 
Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


dstandish commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076067502

   > > Should we consider yanking 2.9.0 once 2.9.1 is out with this fix?
   > 
   > I don't think so. In my point of view if we wanted to yank then we should 
have merge this quickly and cut 2.9.1 immidiately as critical bug fix. I asked 
about this when issue was discoved but others thought we should wait for more 
more bug fixes as this was not categorized as critical.
   
   To me, I’m not sure whether it meets the criteria of critical, but it’s 
pretty bad. If the user is using home chart, then it will run the migration pot 
a lot, and if there are triggers in flight, this could happen and sorta bork 
the cluster. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


jedcunningham commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2076010902

   Just confirmed my suspicions, it doesn't work for offline migrations:
   
   ```
   [2024-04-24T23:08:17.337+] {triggerer_job_runner.py:341} ERROR - 
Exception when executing TriggererJobRunner._run_trigger_loop
   Traceback (most recent call last):   
 
 File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 339, in _execute
   self._run_trigger_loop() 
 
 File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 362, in _run_trigger_loop   
   self.load_triggers() 
 
 File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 377, in load_triggers   
   self.trigger_runner.update_triggers(set(ids))
 
 File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 676, in update_triggers 
   new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)   
 
  ^^
 
 File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 93, in kwargs  
   return self._decrypt_kwargs(self.encrypted_kwargs)   
 
  ^^^   
 
 File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 119, in _decrypt_kwargs
   decrypted_kwargs = 
json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8"))
 
 
^^  

 File 
"/home/airflow/.local/lib/python3.12/site-packages/cryptography/fernet.py", 
line 211, in decrypt   
   raise InvalidToken   
 
   cryptography.fernet.InvalidToken 
 
   ```
   
   This was all that was spit out for that migration:
   
   ```
   -- Running upgrade ee1467d4aa35 -> 1949afb29106
   
   ALTER TABLE trigger ALTER COLUMN kwargs TYPE TEXT;
   
   UPDATE alembic_version SET version_num='1949afb29106' WHERE 
alembic_version.version_num = 'ee1467d4aa35';
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


jedcunningham commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2075998782

   Ignoring that the conditional is wrong, I don't follow how this migration 
approach works for offline migration in the first place. Am I missing 
functionality somewhere that solves that scenario?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] fix: sqa deprecations for airflow task cmd [airflow]

2024-04-24 Thread via GitHub


dondaum opened a new pull request, #39244:
URL: https://github.com/apache/airflow/pull/39244

   
   
   
   
   related: #28723
   
   fix deprecations for SQLAlchemy 2.0 for Airflow core task command.
   
   SQLAlchemy 2.0 is changing the behavior when an object is being merged into 
a Session along the backref cascade. Until SQLAlchemy 1.4 and assuming a 
bidirectional relationship between a TaskInstance and a DagRun, if a DagRun 
object is already in a Session the TaskInstance object gets put into the 
Session as well. This behavior is deprecated for 
[removal](https://docs.sqlalchemy.org/en/20/changelog/migration_14.html#cascade-backrefs-behavior-deprecated-for-removal-in-2-0)
 in SQLAlchemy 2.0.
   
   In order to mentain the actual behavior and to fix the warning, we need to 
ensure that both objects are either not in the session or are in the session 
when they are associated with each other. See 
[here](https://github.com/sqlalchemy/sqlalchemy/discussions/7693) for more 
information.
   
   ### Reported in core
   - [x] airflow/cli/commands/task_command.py:202
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] KubernetesPodOperator duplicating logs when interrupted [airflow]

2024-04-24 Thread via GitHub


tirkarthi commented on issue #39236:
URL: https://github.com/apache/airflow/issues/39236#issuecomment-2075830089

   Related https://github.com/apache/airflow/issues/33498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] front - admin menu - drop-down non deterministic [airflow]

2024-04-24 Thread via GitHub


jscheffl commented on issue #39135:
URL: https://github.com/apache/airflow/issues/39135#issuecomment-2075813598

   @potiuk Will try a regression the next days, might need a moment. But we are 
also using OAUTH and it smells very probable that the correction fixes it... 
THANKS!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make audit log before/after filterable [airflow]

2024-04-24 Thread via GitHub


jscheffl commented on PR #39120:
URL: https://github.com/apache/airflow/pull/39120#issuecomment-2075804286

   > > I tried to "use" the audit log and either I am tooo stupid (rarely have 
clicked there, maybe I'm a noob in this) but:
   > > 
   > > * When clicking in one of the date fields and picking a date the filter 
is not applied.
   > > * Clicking or pressing enter to a different aera clears the previous 
selcted data
   > > * I tried to use multiple options of include/exclude filter and it 
always when using include generated empty results or in case of exclude no 
effect. Somehow I was not able to filter and I did not understand where I can 
filter for.
   > > 
   > > Might it be that with the changes something is broken? Or is my browser 
failing? Ubuntu 22.04 x64, Firefox.
   > 
   > Something like selected a Before date and adding an event name doesn't 
work for you? https://private-user-images.githubusercontent.com/4600967/324977078-c06c0422-211c-4469-9cb6-2b6d9f2e6abf.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTM5OTA4MjEsIm5iZiI6MTcxMzk5MDUyMSwicGF0aCI6Ii80NjAwOTY3LzMyNDk3NzA3OC1jMDZjMDQyMi0yMTFjLTQ0NjktOWNiNi0yYjZkOWYyZTZhYmYucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI0MDQyNCUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNDA0MjRUMjAyODQxWiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9ZDc2Njg3YTI1MTkxODg2MWI5YmJkMGYwYWI1YjhjYTE2OGMyODA5ZjRmNmJjMmM2YTIzOTZhZDE4ZTgyNTEyMiZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QmYWN0b3JfaWQ9MCZrZXlfaWQ9MCZyZXBvX2lkPTAifQ.OqQj40YShM6ntFme2OIWR7QXjWV8RjhNXKru2YI7w8M;>
   
   Did a regression on my Ubuntu 20.04 x64 LTS:
   - Firefox
 - Both Start/End date are _not_ working. I can use the date picker but 
upon exit of the popup, date / time selection is reset
 - Event filter is working (when entering text, but no suggestions in the 
popup below, do I need to expect anything?)
   - Chromium
 - Date start/end selection is working.
 - Event filter is working (when entering text, but no suggestions in the 
popup below, do I need to expect anything?)
   
   UPDATE! I beleieve I found the root cause... in Firefox it looks like:
   
![image](https://github.com/apache/airflow/assets/95105677/86a72a8c-09a3-49d2-a34b-9bc40f80b4be)
   --> The DATE Popup oly shows a date selection. After clicking on a date the 
time fields are filled with --:--:-- and when exiting the dtae selection is 
reset. If I change the time before leaving focus the filter is applied.
   
   Difference in Chromium the popup is like... WITH date AND time:
   
![image](https://github.com/apache/airflow/assets/95105677/e7bf200a-1b55-4821-92e1-a5d357f3d968)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


vincbeck commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578495743


##
airflow/providers/celery/executors/celery_executor.py:
##
@@ -368,8 +368,10 @@ def update_all_task_states(self) -> None:
 if state:
 self.update_task_state(key, state, info)
 
-def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, 
info=None) -> None:
-super().change_state(key, state, info)
+def change_state(
+self, key: TaskInstanceKey, state: TaskInstanceState, info=None, 
remove_running=True
+) -> None:
+super().change_state(key, state, info, remove_running=remove_running)

Review Comment:
   Same here no? If Airflow version is less than 2.10, it will not recognize 
`remove_running`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators [airflow]

2024-04-24 Thread via GitHub


florian-guily commented on issue #39222:
URL: https://github.com/apache/airflow/issues/39222#issuecomment-2075777459

   > @florian-guily  Would you like to be assigned to this issue?
   
   I'd like to yes, i'll find the time to resolve it !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DRAFT] Switch to Connexion 3 framework [airflow]

2024-04-24 Thread via GitHub


potiuk commented on code in PR #39055:
URL: https://github.com/apache/airflow/pull/39055#discussion_r1578443041


##
airflow/www/app.py:
##
@@ -192,3 +209,8 @@ def purge_cached_app():
 """Remove the cached version of the app in global state."""
 global app
 app = None
+
+
+def cached_flask_app(config=None, testing=False):

Review Comment:
   Replaced with both app cached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DRAFT] Switch to Connexion 3 framework [airflow]

2024-04-24 Thread via GitHub


potiuk commented on code in PR #39055:
URL: https://github.com/apache/airflow/pull/39055#discussion_r1578442260


##
airflow/www/package.json:
##
@@ -141,7 +141,8 @@
 "reactflow": "^11.7.4",
 "redoc": "^2.0.0-rc.72",
 "remark-gfm": "^3.0.1",
-"swagger-ui-dist": "4.1.3",
+"sanitize-html": "^2.12.1",

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch constraints-main updated: Updating constraints. Github run id:8821161824

2024-04-24 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch constraints-main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/constraints-main by this push:
 new 1376aa1294 Updating constraints. Github run id:8821161824
1376aa1294 is described below

commit 1376aa129487513f871f7ec1011dd4c3ff97a965
Author: Automated GitHub Actions commit 
AuthorDate: Wed Apr 24 19:41:22 2024 +

Updating constraints. Github run id:8821161824

This update in constraints is automatically committed by the CI 
'constraints-push' step based on
'refs/heads/main' in the 'apache/airflow' repository with commit sha 
9619536e6f1f5737d56d2ef761c2e4467f17cd4e.

The action that build those constraints can be found at 
https://github.com/apache/airflow/actions/runs/8821161824/

The image tag used for that build was: 
9619536e6f1f5737d56d2ef761c2e4467f17cd4e. You can enter Breeze environment
with this image by running 'breeze shell --image-tag 
9619536e6f1f5737d56d2ef761c2e4467f17cd4e'

All tests passed in this build so we determined we can push the updated 
constraints.

See 
https://github.com/apache/airflow/blob/main/README.md#installing-from-pypi for 
details.
---
 constraints-3.10.txt  | 15 +++
 constraints-3.11.txt  | 15 +++
 constraints-3.12.txt  | 15 +++
 constraints-3.8.txt   | 15 +++
 constraints-3.9.txt   | 15 +++
 constraints-no-providers-3.10.txt |  4 ++--
 constraints-no-providers-3.11.txt |  4 ++--
 constraints-no-providers-3.12.txt |  4 ++--
 constraints-no-providers-3.8.txt  |  4 ++--
 constraints-no-providers-3.9.txt  |  4 ++--
 constraints-source-providers-3.10.txt | 15 +++
 constraints-source-providers-3.11.txt | 15 +++
 constraints-source-providers-3.12.txt | 15 +++
 constraints-source-providers-3.8.txt  | 15 +++
 constraints-source-providers-3.9.txt  | 15 +++
 15 files changed, 80 insertions(+), 90 deletions(-)

diff --git a/constraints-3.10.txt b/constraints-3.10.txt
index 1d3934447d..78dd283592 100644
--- a/constraints-3.10.txt
+++ b/constraints-3.10.txt
@@ -1,6 +1,6 @@
 
 #
-# This constraints file was automatically generated on 
2024-04-23T20:36:37.772543
+# This constraints file was automatically generated on 
2024-04-24T18:26:41.767946
 # via "eager-upgrade" mechanism of PIP. For the "main" branch of Airflow.
 # This variant of constraints install uses the HEAD of the branch version for 
'apache-airflow' but installs
 # the providers from PIP-released packages at the moment of the constraint 
generation.
@@ -229,7 +229,7 @@ bcrypt==4.1.2
 beautifulsoup4==4.12.3
 billiard==4.2.0
 bitarray==2.9.2
-black==24.4.0
+black==24.4.1
 blinker==1.7.0
 boto3==1.34.69
 botocore==1.34.69
@@ -283,7 +283,6 @@ docopt==0.6.2
 docstring_parser==0.16
 docutils==0.16
 duckdb==0.10.2
-editables==0.5
 elastic-transport==8.13.0
 elasticsearch==8.13.0
 email_validator==2.1.1
@@ -375,8 +374,8 @@ gssapi==1.8.3
 gunicorn==22.0.0
 h11==0.14.0
 h2==4.1.0
-hatch==1.9.4
-hatchling==1.21.1
+hatch==1.9.6
+hatchling==1.24.2
 hdfs==2.7.3
 hmsclient==0.1.1
 hpack==4.0.0
@@ -480,7 +479,7 @@ nodeenv==1.8.0
 numpy==1.26.4
 oauthlib==3.2.2
 objsize==0.7.0
-openai==1.23.2
+openai==1.23.3
 openapi-schema-validator==0.6.2
 openapi-spec-validator==0.7.1
 openlineage-integration-common==1.12.0
@@ -724,11 +723,11 @@ uv==0.1.35
 validators==0.28.1
 vertica-python==1.3.8
 vine==5.1.0
-virtualenv==20.26.0
+virtualenv==20.25.3
 watchtower==3.2.0
 wcwidth==0.2.13
 weaviate-client==3.26.2
-websocket-client==1.7.0
+websocket-client==1.8.0
 wirerope==0.4.7
 wrapt==1.16.0
 xmlsec==1.3.13
diff --git a/constraints-3.11.txt b/constraints-3.11.txt
index f5bf50aa28..5265c11aae 100644
--- a/constraints-3.11.txt
+++ b/constraints-3.11.txt
@@ -1,6 +1,6 @@
 
 #
-# This constraints file was automatically generated on 
2024-04-23T20:36:37.241476
+# This constraints file was automatically generated on 
2024-04-24T18:26:42.167347
 # via "eager-upgrade" mechanism of PIP. For the "main" branch of Airflow.
 # This variant of constraints install uses the HEAD of the branch version for 
'apache-airflow' but installs
 # the providers from PIP-released packages at the moment of the constraint 
generation.
@@ -228,7 +228,7 @@ bcrypt==4.1.2
 beautifulsoup4==4.12.3
 billiard==4.2.0
 bitarray==2.9.2
-black==24.4.0
+black==24.4.1
 blinker==1.7.0
 boto3==1.34.69
 botocore==1.34.69
@@ -282,7 +282,6 @@ docopt==0.6.2
 docstring_parser==0.16
 docutils==0.16
 duckdb==0.10.2
-editables==0.5
 elastic-transport==8.13.0
 elasticsearch==8.13.0
 email_validator==2.1.1
@@ -373,8 +372,8 @@ gssapi==1.8.3
 gunicorn==22.0.0
 h11==0.14.0
 h2==4.1.0
-hatch==1.9.4
-hatchling==1.21.1
+hatch==1.9.6
+hatchling==1.24.2
 hdfs==2.7.3

Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]

2024-04-24 Thread via GitHub


RNHTTR commented on PR #39177:
URL: https://github.com/apache/airflow/pull/39177#issuecomment-2075686454

   I can't resolve my conversations for some reason... maybe because some git 
stuff as it's only showing one commit? Either way, my comments were addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Power BI Provider [airflow]

2024-04-24 Thread via GitHub


ambika-garg opened a new pull request, #39243:
URL: https://github.com/apache/airflow/pull/39243

   # Apache Airflow Provider for Power BI.
   
   ## Operators
   ### PowerBIDatasetRefreshOperator
   The operator triggers the Power BI dataset refresh and pushes the details of 
refresh in Xcom. It can accept the following parameters:
   
   * `dataset_id`: The dataset Id.
   * `group_id`: The workspace Id.
   * `wait_for_termination`: (Default value: True) Wait until the pre-existing 
or current triggered refresh completes before exiting.
   * `force_refresh`: When enabled, it will force refresh the dataset again, 
after pre-existing ongoing refresh request is terminated.
   * `timeout`: Time in seconds to wait for a dataset to reach a terminal 
status for non-asynchronous waits. Used only if ``wait_for_termination`` is 
True.
   * `check_interval`: Number of seconds to wait before rechecking the refresh 
status.
   
   ## Hooks
   ### PowerBI Hook
   A hook to interact with Power BI.
   * `powerbi_conn_id`: Airflow Connection ID that contains the connection 
information for the Power BI account used for authentication.
   
   ## Features
   *  Xcom Integration: The Power BI Dataset refresh operator enriches the 
Xcom with essential fields for downstream tasks:
   1. `powerbi_dataset_refresh_id`: Request Id of the Dataset Refresh.
   2. `powerbi_dataset_refresh_status`: Refresh Status.
   * `Unknown`: Refresh state is unknown or a refresh is in progress.
   * `Completed`: Refresh successfully completed.
   * `Failed`: Refresh failed (details in `powerbi_dataset_refresh_error`).
   * `Disabled`: Refresh is disabled by a selective refresh.
   3. `powerbi_dataset_refresh_end_time`: The end date and time of the refresh 
(may be None if a refresh is in progress)
   4. `powerbi_dataset_refresh_error`: Failure error code in JSON format (None 
if no error)
   
   *  External Monitoring link: The operator conveniently provides a 
redirect link to the Power BI UI for monitoring refreshes.
   
   ## Sample DAG to use the plugin.
   
   Check out the sample DAG code below:
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from operators.powerbi_refresh_dataset_operator import 
PowerBIDatasetRefreshOperator
   
   
   with DAG(
   dag_id='refresh_dataset_powerbi',
   schedule_interval=None,
   start_date=datetime(2023, 8, 7),
   catchup=False,
   concurrency=20,
   tags=['powerbi', 'dataset', 'refresh']
   ) as dag:
   
   refresh_in_given_workspace = PowerBIDatasetRefreshOperator(
   task_id="refresh_in_given_workspace",
   dataset_id="http://www.apache.org/licenses/LICENSE-2.0
   
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
   
   
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators [airflow]

2024-04-24 Thread via GitHub


RNHTTR commented on issue #39222:
URL: https://github.com/apache/airflow/issues/39222#issuecomment-2075666741

   @florian-guily  Would you like to be assigned to this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


eladkal commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2075624714

   > Should we consider yanking 2.9.0 once 2.9.1 is out with this fix?
   
   I don't think so.
   In my point of view if we wanted to yank then we should have merge this 
quickly and cut 2.9.1 immidiately as critical bug fix. I asked about this when 
issue was discoved but others thought we should wait for more more bug fixes as 
this was not categorized as critical.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Run system tests using `dag.test()` [airflow]

2024-04-24 Thread via GitHub


potiuk commented on PR #39176:
URL: https://github.com/apache/airflow/pull/39176#issuecomment-2075562155

   It's a completely different mechanism - it just runs `_run_raw_task` for all 
tasks according to dependencies - look at the code. 
   
   If you run it with a different executor, what you really need you need to 
run Airflow Scheduler with the executor and you need to trigger the DAG (via 
APi or CLI). You should not really use Pytest for that.  Previously backfill 
was used to run the dags (this is how Debug Executor worked) - but that was a 
feature of Debug Executor - if you have ECS executor, then having scheduler to 
run the DAG is probably much better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Run system tests using `dag.test()` [airflow]

2024-04-24 Thread via GitHub


vincbeck commented on PR #39176:
URL: https://github.com/apache/airflow/pull/39176#issuecomment-2075546584

   Question on this one though. From the doc it says, `dag.test()` does not use 
an executor at all. So it is not possible, somehow, to configure to use a 
specific executor. I had in mind to possibly run some system tests with 
different executor (e.g. ECSExecutor). How hard would that be?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Deprecation of AutoML services: Add deprecation warnings and raise exceptions for already deprecated ones [airflow]

2024-04-24 Thread via GitHub


potiuk commented on PR #38673:
URL: https://github.com/apache/airflow/pull/38673#issuecomment-2075537682

   cc: @VladaZakharova - there were some doubts  for Google team about those 
deprecations - does this one look ok for you?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow trust env parameter to be defined in extra options of HTTP Connection [airflow]

2024-04-24 Thread via GitHub


potiuk merged PR #39161:
URL: https://github.com/apache/airflow/pull/39161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch main updated (12ce2dcd46 -> 9619536e6f)

2024-04-24 Thread potiuk
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 12ce2dcd46 Retry merge commit retrieval on failure (#39231)
 add 9619536e6f Allow trust env parameter to be defined in extra options of 
HTTP Connection (#39161)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/http/hooks/http.py|  4 
 tests/providers/http/hooks/test_http.py | 23 +++
 2 files changed, 27 insertions(+)



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578308432


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": ClusterStatus.State.DELETING,
+"cluster": cluster,
+}
+)
+return
+elif state == ClusterStatus.State.RUNNING:
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state,
+"cluster": cluster,
+}
+)
+return
+self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+await asyncio.sleep(self.polling_interval_seconds)
+except asyncio.CancelledError:
+try:
+if self.delete_on_error:
+self.log.info("Deleting cluster %s.", self.cluster_name)
+# The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
+# This is because the asynchronous hook deletion is not 
awaited when the trigger task
+# is cancelled. The call for deleting the cluster through 
the sync hook is not a blocking
+# call, which means it does not wait until the cluster is 
deleted.
+self.get_sync_hook().delete_cluster(
+region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+)
+self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+except Exception as e:
+self.log.error("Error during cancellation handling: %s", e)
+raise AirflowException("Error during cancellation handling: 
%s", e)
+
+async def fetch_cluster(self) -> Cluster:
+"""Fetch the cluster status."""
+return await self.get_async_hook().get_cluster(
+project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+)
+
+async def delete_when_error_occurred(self, cluster: Cluster):
+"""
+Delete the cluster on error.
+
+:param cluster: The cluster to delete.
+"""
+if self.delete_on_error:
+self.log.info("Deleting cluster %s.", self.cluster_name)
+await self.get_async_hook().delete_cluster(
+region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+)
+self.log.info("Cluster %s has been deleted.", self.cluster_name)
+else:
+self.log.info(
+"Cluster %s is not be deleted as delete_on_error is set to 
False.", self.cluster_name

Review Comment:
   ```suggestion
   "Cluster %s is not deleted as delete_on_error is set to 
False.", self.cluster_name
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump minimum Airflow version in providers to Airflow 2.7.0 [airflow]

2024-04-24 Thread via GitHub


potiuk commented on code in PR #39240:
URL: https://github.com/apache/airflow/pull/39240#discussion_r1578307454


##
dev/breeze/src/airflow_breeze/global_constants.py:
##
@@ -473,10 +473,8 @@ def _exclusion(providers: Iterable[str]) -> str:
 BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str]] = [
 {
 "python-version": "3.8",
-"airflow-version": "2.6.0",
-"remove-providers": _exclusion(
-["openlineage", "common.io", "cohere", "fab", "qdrant", 
"microsoft.azure"]
-),
+"airflow-version": "2.7.0",

Review Comment:
   You should just remove 2.7.0 -> there is a small incompatibility in the 
cohere provider with importlib, that makes it technically `2.7.1` compatible 
not 2.7.0 - that's why 2.7.1 is added below. I think it's not worth to 
complicate things and exclude `cohere` for 2.7.0 additionally - technically 
there is a very small differerence between 2.7.0 and 2.7.1 so I'd rather leave 
2.7.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Resolve deprecations in Python sensors/operators/decorators [airflow]

2024-04-24 Thread via GitHub


Taragolis commented on code in PR #39241:
URL: https://github.com/apache/airflow/pull/39241#discussion_r1578300862


##
tests/deprecations_ignore.yml:
##
@@ -349,23 +313,8 @@
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_key_type
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_fail_multiple_outputs_no_dict
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_manual_multiple_outputs_false_with_typings
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_empty_dict
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_ignore_typing
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_multiple_outputs_return_none
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_arguments_are_templatized
-- 
tests/decorators/test_python.py::TestAirflowTaskDecorator::test_python_callable_keyword_arguments_are_templatized
-- tests/decorators/test_python.py::TestAirflowTaskDecorator::test_xcom_arg
+# Inlets/Outlets use TaskInstance.xcom_push() with deprecated arguments in 
airflow/lineage/__init__.py

Review Comment:
   
https://github.com/apache/airflow/blob/c6bc0529805be98cffbf336070abee32b93ca39a/airflow/lineage/__init__.py#L84-L92
   
   ```console
   ERRORairflow.task:taskinstance.py:2974 Task failed with exception
   Traceback (most recent call last):
 File "/opt/airflow/airflow/models/taskinstance.py", line 2550, in 
_run_raw_task
   self._execute_task_with_callbacks(context, test_mode, session=session)
 File "/opt/airflow/airflow/models/taskinstance.py", line 2756, in 
_execute_task_with_callbacks
   self.task.post_execute(context=context, result=result)
 File "/opt/airflow/airflow/lineage/__init__.py", line 85, in wrapper
   self.xcom_push(
 File "/opt/airflow/airflow/models/baseoperator.py", line 1590, in xcom_push
   context["ti"].xcom_push(key=key, value=value, 
execution_date=execution_date)
 File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
   return func(*args, session=session, **kwargs)
 File "/opt/airflow/airflow/models/taskinstance.py", line 3270, in xcom_push
   warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
   airflow.exceptions.RemovedInAirflow3Warning: Passing 'execution_date' to 
'TaskInstance.xcom_push()' is deprecated.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Resolve deprecations in Python sensors/operators/decorators [airflow]

2024-04-24 Thread via GitHub


Taragolis opened a new pull request, #39241:
URL: https://github.com/apache/airflow/pull/39241

   
   
   
   
   related: https://github.com/apache/airflow/issues/38642
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch main updated: Retry merge commit retrieval on failure (#39231)

2024-04-24 Thread potiuk
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 12ce2dcd46 Retry merge commit retrieval on failure (#39231)
12ce2dcd46 is described below

commit 12ce2dcd46f6ad59e0a03925654a92126c2e5164
Author: Jarek Potiuk 
AuthorDate: Wed Apr 24 19:49:22 2024 +0200

Retry merge commit retrieval on failure (#39231)

Sometimes when merge commit retrieval fails (because of race condition
most likely as pull request event is no ready yet) build-images builds
all images because selective check does not know the merge commit (in
pull request target vuild image failure). That was the only side effect.
because otherwise build image had a fallback to
github.event.pull_request.head.sha in this case.

With this PR, we will retry it and fallback explicitly to
github.event.pull_request.head.sha if it does not work.
---
 .github/workflows/build-images.yml | 19 ++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/build-images.yml 
b/.github/workflows/build-images.yml
index 9c732d1166..bd10e73aac 100644
--- a/.github/workflows/build-images.yml
+++ b/.github/workflows/build-images.yml
@@ -91,8 +91,25 @@ jobs:
   - name: Discover PR merge commit
 id: discover-pr-merge-commit
 run: |
+  # Sometimes target-commit-sha cannot be
   TARGET_COMMIT_SHA="$(gh api '${{ github.event.pull_request.url }}' 
--jq .merge_commit_sha)"
-  echo "TARGET_COMMIT_SHA=$TARGET_COMMIT_SHA" >> ${GITHUB_ENV}
+  if [[ ${TARGET_COMMIT_SHA} == "" ]]; then
+# Sometimes retrieving the merge commit SHA from PR fails. We 
retry it once. Otherwise we
+# fall-back to github.event.pull_request.head.sha
+echo
+echo "Could not retrieve merge commit SHA from PR, waiting for 3 
seconds and retrying."
+echo
+sleep 3
+TARGET_COMMIT_SHA="$(gh api '${{ github.event.pull_request.url }}' 
--jq .merge_commit_sha)"
+if [[ ${TARGET_COMMIT_SHA} == "" ]]; then
+  echo
+  echo "Could not retrieve merge commit SHA from PR, falling back 
to PR head SHA."
+  echo
+  TARGET_COMMIT_SHA="${{ github.event.pull_request.head.sha }}"
+fi
+  fi
+  echo "TARGET_COMMIT_SHA=${TARGET_COMMIT_SHA}"
+  echo "TARGET_COMMIT_SHA=${TARGET_COMMIT_SHA}" >> ${GITHUB_ENV}
   echo "target-commit-sha=${TARGET_COMMIT_SHA}" >> ${GITHUB_OUTPUT}
 if: github.event_name == 'pull_request_target'
   # The labels in the event aren't updated when re-triggering the job, So 
lets hit the API to get



Re: [PR] Retry merge commit retrieval on failure [airflow]

2024-04-24 Thread via GitHub


potiuk merged PR #39231:
URL: https://github.com/apache/airflow/pull/39231


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Pinecone provider support for `pinecone-client`>=3 [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #37307:
URL: https://github.com/apache/airflow/pull/37307#discussion_r1578266977


##
airflow/providers/pinecone/provider.yaml:
##
@@ -45,7 +45,7 @@ dependencies:
   # Pinecone Python SDK v3.0.0 was released at 2024-01-16 and introduce some 
breaking changes.
   # It's crucial to adhere to the v3.0.0 Migration Guide before the 
upper-bound limitation can be removed.
   # 
https://canyon-quilt-082.notion.site/Pinecone-Python-SDK-v3-0-0-Migration-Guide-056d3897d7634bf7be399676a4757c7b

Review Comment:
   This note can now be removed isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578261459


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   ah maybe it's because the code doesn't import `running_state`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578260345


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   but I am still Compat tests for 2.6 / 2.7.1 / 2.8.0 passed in the CI
   I think it means that we are missing coverage to the code part that invoke 
this function
   cc @potiuk am I right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


o-nikolas commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578255216


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   Ooo, I see what you mean now, good catch! I suppose we could set a min 
version, but instead I'll try call that new method in a try/catch and swallow 
the exception. The only real consequence of that is that task adoption will not 
work (but this is a nice to have and many executors don't support it), the code 
to support that is what introduced this regression in the first place anyway. 
So task adoption will only work in 2.10 forward, but the rest will be backwards 
compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578257489


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   normally we backport by copying the needed function into the provider as 
private function with note to remove the workaround when min airflow version is 
greater than what is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bump minimum Airflow version in providers to Airflow 2.7.0 [airflow]

2024-04-24 Thread via GitHub


eladkal opened a new pull request, #39240:
URL: https://github.com/apache/airflow/pull/39240

   
   
   
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch main updated: Fixed side effect of menu filtering causing disappearing menus (#39229)

2024-04-24 Thread potiuk
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
 new 0d2c0c5cf0 Fixed side effect of menu filtering causing disappearing 
menus (#39229)
0d2c0c5cf0 is described below

commit 0d2c0c5cf04ef886a8211820d0dc2f4cd8c47251
Author: Jarek Potiuk 
AuthorDate: Wed Apr 24 19:09:09 2024 +0200

Fixed side effect of menu filtering causing disappearing menus (#39229)

The default implementation of filter_permitted_menu_items had a
subtle side-effect. The filtering on child items was done in-place
and modified the menu itself, so it was enough to get the same
worker serve requests for multiple users for the same menu to get
the items removed for subsequent user - even if they had permission
to see it.

Deepcopying the menu items before filtering them should fix the problem

Fixes: #39204
Fixes: #39135
---
 airflow/auth/managers/base_auth_manager.py| 14 --
 tests/auth/managers/test_base_auth_manager.py | 40 +++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/airflow/auth/managers/base_auth_manager.py 
b/airflow/auth/managers/base_auth_manager.py
index 7bb4e92889..44fc53a66e 100644
--- a/airflow/auth/managers/base_auth_manager.py
+++ b/airflow/auth/managers/base_auth_manager.py
@@ -21,6 +21,7 @@ from abc import abstractmethod
 from functools import cached_property
 from typing import TYPE_CHECKING, Container, Literal, Sequence
 
+from flask_appbuilder.menu import MenuItem
 from sqlalchemy import select
 
 from airflow.auth.managers.models.resource_details import (
@@ -34,7 +35,6 @@ from airflow.utils.session import NEW_SESSION, provide_session
 
 if TYPE_CHECKING:
 from flask import Blueprint
-from flask_appbuilder.menu import MenuItem
 from sqlalchemy.orm import Session
 
 from airflow.auth.managers.models.base_user import BaseUser
@@ -397,13 +397,21 @@ class BaseAuthManager(LoggingMixin):
 )
 accessible_items = []
 for menu_item in items:
+menu_item_copy = MenuItem(
+name=menu_item.name,
+icon=menu_item.icon,
+label=menu_item.label,
+childs=[],
+baseview=menu_item.baseview,
+cond=menu_item.cond,
+)
 if menu_item.childs:
 accessible_children = []
 for child in menu_item.childs:
 if 
self.security_manager.has_access(ACTION_CAN_ACCESS_MENU, child.name):
 accessible_children.append(child)
-menu_item.childs = accessible_children
-accessible_items.append(menu_item)
+menu_item_copy.childs = accessible_children
+accessible_items.append(menu_item_copy)
 return accessible_items
 
 @abstractmethod
diff --git a/tests/auth/managers/test_base_auth_manager.py 
b/tests/auth/managers/test_base_auth_manager.py
index 64d33f6065..a39b60787c 100644
--- a/tests/auth/managers/test_base_auth_manager.py
+++ b/tests/auth/managers/test_base_auth_manager.py
@@ -313,3 +313,43 @@ class TestBaseAuthManager:
 assert result[1].name == "item3"
 assert len(result[1].childs) == 1
 assert result[1].childs[0].name == "item3.1"
+
+@patch.object(EmptyAuthManager, "security_manager")
+def test_filter_permitted_menu_items_twice(self, mock_security_manager, 
auth_manager):
+mock_security_manager.has_access.side_effect = [
+# 1st call
+True,  # menu 1
+False,  # menu 2
+True,  # menu 3
+True,  # Item 3.1
+False,  # Item 3.2
+# 2nd call
+False,  # menu 1
+True,  # menu 2
+True,  # menu 3
+False,  # Item 3.1
+True,  # Item 3.2
+]
+
+menu = Menu()
+menu.add_link("item1")
+menu.add_link("item2")
+menu.add_link("item3")
+menu.add_link("item3.1", category="item3")
+menu.add_link("item3.2", category="item3")
+
+result = auth_manager.filter_permitted_menu_items(menu.get_list())
+
+assert len(result) == 2
+assert result[0].name == "item1"
+assert result[1].name == "item3"
+assert len(result[1].childs) == 1
+assert result[1].childs[0].name == "item3.1"
+
+result = auth_manager.filter_permitted_menu_items(menu.get_list())
+
+assert len(result) == 2
+assert result[0].name == "item2"
+assert result[1].name == "item3"
+assert len(result[1].childs) == 1
+assert result[1].childs[0].name == "item3.2"



Re: [I] Pools have disappeared from the Admin menu [airflow]

2024-04-24 Thread via GitHub


potiuk closed issue #39204: Pools have disappeared from the Admin menu
URL: https://github.com/apache/airflow/issues/39204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] front - admin menu - drop-down non deterministic [airflow]

2024-04-24 Thread via GitHub


potiuk closed issue #39135: front - admin menu - drop-down non deterministic
URL: https://github.com/apache/airflow/issues/39135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fixed side effect of menu filtering causing disappearing menus [airflow]

2024-04-24 Thread via GitHub


potiuk merged PR #39229:
URL: https://github.com/apache/airflow/pull/39229


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]

2024-04-24 Thread via GitHub


kahlstrm commented on code in PR #39177:
URL: https://github.com/apache/airflow/pull/39177#discussion_r1578227079


##
airflow/utils/log/file_task_handler.py:
##
@@ -384,7 +380,13 @@ def _read(
 worker_log_full_path = Path(self.local_base, worker_log_rel_path)
 local_messages, local_logs = 
self._read_from_local(worker_log_full_path)
 messages_list.extend(local_messages)
-if is_running and not executor_messages:
+if (
+ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
+and not executor_messages
+and not remote_logs

Review Comment:
   Exactly   and this is what we encountered in our environment after 
upgrading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578223632


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   but what will happen if someone install newer version of the provider with 
Airflow 2.8.x / 2.9.0?
   the call for `self.running_state` would not work doesn't it?
   
   It's quite surprising to me that back compact test passed, I may missing 
something here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578223632


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   but what will happen if someone install newer version of the provider with 
Airflow 2.8.x / 2.9.0?
   the call for `self.running_state` would not work doesn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fixed side effect of menu filtering causing disappearing menus [airflow]

2024-04-24 Thread via GitHub


potiuk commented on PR #39229:
URL: https://github.com/apache/airflow/pull/39229#issuecomment-2075405977

   I had to make it a bit more complex, because of infnite recursion


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch main updated (26e5b9ffe0 -> 6db6fef357)

2024-04-24 Thread bbovenzi
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 26e5b9ffe0 Resolve deprecations in `tests/utils/` (#39228)
 add 6db6fef357 Use grid view for Task Instance's `log_url` (#39183)

No new revisions were added by this update.

Summary of changes:
 airflow/models/taskinstance.py  | 10 ++
 tests/models/test_taskinstance.py   |  9 +
 tests/providers/smtp/notifications/test_smtp.py |  2 +-
 3 files changed, 12 insertions(+), 9 deletions(-)



Re: [PR] Use grid view for Task Instance's `log_url` [airflow]

2024-04-24 Thread via GitHub


bbovenzi merged PR #39183:
URL: https://github.com/apache/airflow/pull/39183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]

2024-04-24 Thread via GitHub


RNHTTR commented on code in PR #39177:
URL: https://github.com/apache/airflow/pull/39177#discussion_r1578209347


##
airflow/utils/log/file_task_handler.py:
##
@@ -384,7 +380,13 @@ def _read(
 worker_log_full_path = Path(self.local_base, worker_log_rel_path)
 local_messages, local_logs = 
self._read_from_local(worker_log_full_path)
 messages_list.extend(local_messages)
-if is_running and not executor_messages:
+if (
+ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
+and not executor_messages
+and not remote_logs

Review Comment:
   I see. So, when clearing a _previous_ task instance try, if you don't have 
remote logs configured, you won't be able to see this try's task instance logs 
until the task completes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fetch served logs when no remote/executor logs available for non-running task try [airflow]

2024-04-24 Thread via GitHub


RNHTTR commented on code in PR #39177:
URL: https://github.com/apache/airflow/pull/39177#discussion_r1578196224


##
airflow/utils/log/file_task_handler.py:
##
@@ -404,11 +406,15 @@ def _read(
 )
 log_pos = len(logs)
 messages = "".join([f"*** {x}\n" for x in messages_list])
+end_of_log = ti.try_number != try_number or ti.state not in (

Review Comment:
   Nitpick: Since you check whether `ti.state` is or isn't in 
`(TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)` twice, would it make 
sense to replace `is_running` with `is_in_running_or_deferred` and use that 
when checking for `executor_messages`/`remote_logs` and when defining 
`end_of_log`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


o-nikolas commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1578193059


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   It still functioned _mostly_ correctly. The running and queued task sets are 
maintained mostly by the base executor and used for emitting metrics and to 
calculate open slots. I think the regression is only present in airflow 2.9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Pinecone provider support for `pinecone-client`>=3 [airflow]

2024-04-24 Thread via GitHub


rawwar commented on PR #37307:
URL: https://github.com/apache/airflow/pull/37307#issuecomment-2075351057

   @sunank200 , @Lee-W, Do you have any more feedback for the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix trigger_kwargs encryption/decryption on db migration [airflow]

2024-04-24 Thread via GitHub


dstandish commented on PR #38876:
URL: https://github.com/apache/airflow/pull/38876#issuecomment-2075334734

   Should we consider yanking 2.9.0 once 2.9.1 is out with this fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(airflow) branch main updated (14b631294d -> 26e5b9ffe0)

2024-04-24 Thread taragolis
This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 14b631294d Fix: Only quote the keys of the query_parameters in 
MSGraphOperator (#39207)
 add 26e5b9ffe0 Resolve deprecations in `tests/utils/` (#39228)

No new revisions were added by this update.

Summary of changes:
 tests/deprecations_ignore.yml  | 38 --
 tests/utils/log/test_log_reader.py |  1 -
 tests/utils/test_dates.py  |  8 +++--
 tests/utils/test_db_cleanup.py |  4 +++
 tests/utils/test_email.py  | 34 ---
 tests/utils/test_log_handlers.py   | 12 +++
 tests/utils/test_sqlalchemy.py |  2 ++
 tests/utils/test_state.py  |  1 +
 .../test_task_handler_with_custom_formatter.py |  7 +++-
 tests/utils/test_types.py  |  2 ++
 10 files changed, 56 insertions(+), 53 deletions(-)



Re: [PR] Resolve deprecations in `tests/utils/` [airflow]

2024-04-24 Thread via GitHub


Taragolis merged PR #39228:
URL: https://github.com/apache/airflow/pull/39228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] KubernetesPodOperator fails due to getting logs [airflow]

2024-04-24 Thread via GitHub


antoniocorralsierra opened a new issue, #39239:
URL: https://github.com/apache/airflow/issues/39239

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes 8.0.1
   apache-airflow-providers-celery  3.6.1
   
   ### Apache Airflow version
   
   2.8.3
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   I have an Airflow instance deploy on GKE cluster with helm (keda is activate 
on workers to scale from 1 to 5 with a cooldownPeriod set to 240). I use 
CeleryKubernetesExecutor and I run the tasks with KubernetesPodOperator on the 
celery workers. To not have an active task on celery worker while 
KubernetesPodOperator running I activate deferrable mode on them. I set 
poll_interval to 10 seconds. After that many errors with 404 status code 
appears.
   
   Situation:
   
   `example_task = KubernetesPodOperator(task_id="example_task", 
name="example_task", get_logs=True, on_finish_action="delete_pod", 
log_events_on_failure=True, deferrable=True, poll_interval=10, 
logging_interval=None)`
   
   We can suppose that the rest of input params are correct.
   
   The steps are:
   
   1. The celery worker get the task.
   2. Launch the KubernetesPodOperators that create a new pod on the cluster 
that execute the task.
   3. The status of the task change to deferred.
   4. Checking the triggerer log I see that task is complete: 
`[2024-04-24T12:48:57.740+] {triggerer_job_runner.py:623} INFO - trigger 
example_task (ID 668324) completed`
   5. Check the status of the pod, it is completed (success) and finished at 
[2024-04-24 12:48:51.122].
   6. Task change to queue status, waiting for a gap on celery worker to run.
   7. When a gap is avalible on celery worker the task begin to run.
   8. After running, the task finish with error and change to up_for_retry 
status. 
   
   The task logs is:
   `[2024-04-24, 12:51:21 UTC] {taskinstance.py:2731} ERROR - Task failed with 
exception`
   `Traceback (most recent call last):`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 444, in _execute_task`
   `result = _execute_callable(context=context, **execute_callable_kwargs)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 414, in _execute_callable`
   `return execute_callable(context=context, **execute_callable_kwargs)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 1604, in resume_execution`
   `return execute_callable(context)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 738, in trigger_reentry`
   `self.write_logs(self.pod)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 771, in write_logs`
   `logs = self.pod_manager.read_pod_logs(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
324, in wrapped_f`
   `return self(f, *args, **kw)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
404, in __call__`
   `do = self.iter(retry_state=retry_state)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
360, in iter`
   `raise retry_exc.reraise()`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
193, in reraise`
   `raise self.last_attempt.result()`
   `  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, 
in result`
   `return self.__get_result()`
   `  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, 
in __get_result`
   `raise self._exception`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
407, in __call__`
   `result = fn(*args, **kwargs)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 668, in read_pod_logs`
   `logs = self._client.read_namespaced_pod_log(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py",
 line 23957, in read_namespaced_pod_log`
   `return self.read_namespaced_pod_log_with_http_info(name, namespace, 
**kwargs)  # noqa: E501`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py",
 line 24076, in read_namespaced_pod_log_with_http_info`
   `return self.api_client.call_api(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py",
 line 348, in call_api`
   `return self.__call_api(resource_path, method,`
 

[PR] Remove unnecessary validation from cncf provider. [airflow]

2024-04-24 Thread via GitHub


VShkaberda opened a new pull request, #39238:
URL: https://github.com/apache/airflow/pull/39238

   
   
   
   
   
   
   
   Remove check for `initialExecutors` (if `dynamicAllocation` is enabled) as 
unnecessary. The [Spark 
documentation](https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation)
 states that default value for `spark.dynamicAllocation.initialExecutors` is 
`spark.dynamicAllocation.minExecutors`. There is no need to pass 
`initialExecutors` explicitly.
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove unnecessary validation from cncf provider. [airflow]

2024-04-24 Thread via GitHub


boring-cyborg[bot] commented on PR #39238:
URL: https://github.com/apache/airflow/pull/39238#issuecomment-2075282097

   Congratulations on your first Pull Request and welcome to the Apache Airflow 
community! If you have any issues or are unsure about any anything please check 
our Contributors' Guide 
(https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (ruff, mypy and type 
annotations). Our [pre-commits]( 
https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-pre-commit-hooks)
 will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in 
`docs/` directory). Adding a new operator? Check this short 
[guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst)
 Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze 
environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst)
 for testing locally, it's a heavy docker but it ships with a working Airflow 
and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get 
the final approval from Committers.
   - Please follow [ASF Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) for all 
communication including (but not limited to) comments on Pull Requests, Mailing 
list and Slack.
   - Be sure to read the [Airflow Coding style]( 
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices).
   - Always keep your Pull Requests rebased, otherwise your build might fail 
due to changes not related to your commits.
   Apache Airflow is a community-driven project and together we are making it 
better .
   In case of doubts contact the developers at:
   Mailing List: d...@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39230:
URL: https://github.com/apache/airflow/pull/39230#discussion_r1578107932


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(

Review Comment:
   Yes i tested this



##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(

Review Comment:
   I have added the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39230:
URL: https://github.com/apache/airflow/pull/39230#discussion_r1578105965


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(
+job_id=self.job_id, project_id=self.project_id, 
region=self.region
+)

Review Comment:
   Yes, I have tested this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39230:
URL: https://github.com/apache/airflow/pull/39230#discussion_r1578100315


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(
+job_id=self.job_id, project_id=self.project_id, 
region=self.region
+)

Review Comment:
   Yes. I tested this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Grid button to Task Instance view [airflow]

2024-04-24 Thread via GitHub


bbovenzi commented on code in PR #39223:
URL: https://github.com/apache/airflow/pull/39223#discussion_r1578104205


##
airflow/www/templates/airflow/task_instance.html:
##
@@ -39,6 +39,9 @@ 
 {{ url_for(endpoint, dag_id=dag.dag_id, task_id=task_id, 
execution_date=execution_date) }}
   {%- endif -%}
 {% endmacro -%}
+

Review Comment:
   Nice! Could we include the task_id, dag_run_id, map_index in the params to 
link back to the same task instance?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39230:
URL: https://github.com/apache/airflow/pull/39230#discussion_r1578101930


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(

Review Comment:
   Yes. I tested this



##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(

Review Comment:
   Yes i tested this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39230:
URL: https://github.com/apache/airflow/pull/39230#discussion_r1578101930


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(

Review Comment:
   Yes. I tested this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix DataprocSubmitJobOperator in deferrable mode=True when task is marked as failed. [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39230:
URL: https://github.com/apache/airflow/pull/39230#discussion_r1578100315


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -91,20 +93,28 @@ def serialize(self):
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"cancel_on_kill": self.cancel_on_kill,
 },
 )
 
 async def run(self):
-while True:
-job = await self.get_async_hook().get_job(
-project_id=self.project_id, region=self.region, 
job_id=self.job_id
-)
-state = job.status.state
-self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
-break
-await asyncio.sleep(self.polling_interval_seconds)
-yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
+try:
+while True:
+job = await self.get_async_hook().get_job(
+project_id=self.project_id, region=self.region, 
job_id=self.job_id
+)
+state = job.status.state
+self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
+if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
+break
+await asyncio.sleep(self.polling_interval_seconds)
+yield TriggerEvent({"job_id": self.job_id, "job_state": state, 
"job": job})
+except asyncio.CancelledError:
+self.log.info("Task got cancelled.")
+if self.job_id and self.cancel_on_kill:
+await self.get_async_hook().cancel_job(
+job_id=self.job_id, project_id=self.project_id, 
region=self.region
+)

Review Comment:
   Yes. I tested this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] KubernetesPodOperator duplicating logs when interrupted [airflow]

2024-04-24 Thread via GitHub


raphaelauv commented on issue #39236:
URL: https://github.com/apache/airflow/issues/39236#issuecomment-2075176634

   could you try the latest version 8.1.1 of 
`apache-airflow-providers-cncf-kubernetes`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578042750


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.DELETING,

Review Comment:
   I have changed it to `ClusterStatus.State.DELETING`



##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.DELETING,

Review Comment:
   I have changed it to `ClusterStatus.State.DELETING` and this is correct state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use grid view for Task Instance's `log_url` [airflow]

2024-04-24 Thread via GitHub


AetherUnbound commented on PR #39183:
URL: https://github.com/apache/airflow/pull/39183#issuecomment-2075151250

   @bbovenzi there's one test that's failing but it looks like it might be a 
flakey one, would you mind re-running it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578038004


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.ERROR,

Review Comment:
   ClusterStatus.State.DELETING is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578035001


##
tests/providers/google/cloud/triggers/test_dataproc.py:
##
@@ -215,9 +228,48 @@ async def test_cluster_run_loop_is_still_running(
 await asyncio.sleep(0.5)
 
 assert not task.done()
-assert f"Current state is: {ClusterStatus.State.CREATING}"
+assert f"Current state is: {ClusterStatus.State.CREATING}."
 assert f"Sleeping for {TEST_POLL_INTERVAL} seconds."
 
+@pytest.mark.asyncio
+
@mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
+async def test_fetch_cluster_status(self, mock_get_cluster, 
cluster_trigger, async_get_cluster):
+mock_get_cluster.return_value = async_get_cluster(
+status=ClusterStatus(state=ClusterStatus.State.RUNNING)
+)
+cluster = await cluster_trigger.fetch_cluster()
+
+assert cluster.status.state == ClusterStatus.State.RUNNING, "The 
cluster state should be RUNNING"
+
+@pytest.mark.asyncio
+
@mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.delete_cluster")
+async def test_delete_when_error_occurred(self, mock_delete_cluster, 
cluster_trigger):
+mock_cluster = mock.MagicMock(spec=Cluster)
+type(mock_cluster).status = mock.PropertyMock(
+return_value=mock.MagicMock(state=ClusterStatus.State.ERROR)
+)
+
+mock_delete_future = asyncio.Future()
+mock_delete_future.set_result(None)
+mock_delete_cluster.return_value = mock_delete_future
+
+cluster_trigger.delete_on_error = True
+
+await cluster_trigger.delete_when_error_occurred(mock_cluster)
+
+mock_delete_cluster.assert_called_once_with(
+region=cluster_trigger.region,
+cluster_name=cluster_trigger.cluster_name,
+project_id=cluster_trigger.project_id,
+)
+
+mock_delete_cluster.reset_mock()
+cluster_trigger.delete_on_error = False
+
+await cluster_trigger.delete_when_error_occurred(mock_cluster)
+
+mock_delete_cluster.assert_not_called()
+

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Moved low-level logic to call endpoint from trigger to run method hook MSGraph [airflow]

2024-04-24 Thread via GitHub


dabla opened a new pull request, #39237:
URL: https://github.com/apache/airflow/pull/39237

   
   
   
   
   Moved the logic which calls the MSGraph endpoint from MSGraphTrigger to the 
run-method of the KiotaRequestAdapterHook.  That way we have the same approach 
as with the HttpHook and HttpOperator as in the future the dream would be that 
both could become one.  This is also in line with the wanted approach in 
Airflow to have less operators (e.g. thus more generic one) which can use 
interchangeable hooks to achieve the same goal but with less operators (and 
thus more specialized hooks).
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] KubernetesPodOperator duplicating logs when interrupted [airflow]

2024-04-24 Thread via GitHub


boring-cyborg[bot] commented on issue #39236:
URL: https://github.com/apache/airflow/issues/39236#issuecomment-2075062319

   Thanks for opening your first issue here! Be sure to follow the issue 
template! If you are willing to raise PR to address this issue please do so, no 
need to wait for approval.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] KubernetesPodOperator duplicating logs when interrupted [airflow]

2024-04-24 Thread via GitHub


Nikita-Sobolev opened a new issue, #39236:
URL: https://github.com/apache/airflow/issues/39236

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.1
   
   ### What happened?
   
   The KubernetesPodOperator is duplicating tasks's logs two times when `log 
read interrupted but container base still running` they are interrupted. 
Happens randomly on different dags and different runs of the same dag. Assume 
it is somehow connected to the https://github.com/apache/airflow/issues/35019
   
   ### What you think should happen instead?
   
   no logs duplicate
   
   ### How to reproduce
   
   KubernetesPodOperator on cloud AKS cluster
   
   ### Operating System
   
   Ubuntu 22.04
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.8.1
   apache-airflow-providers-amazon==8.16.0
   apache-airflow-providers-celery==3.5.1
   apache-airflow-providers-cncf-kubernetes==7.13.0
   apache-airflow-providers-common-io==1.2.0
   apache-airflow-providers-common-sql==1.10.0
   apache-airflow-providers-docker==3.9.1
   apache-airflow-providers-elasticsearch==5.3.1
   apache-airflow-providers-ftp==3.7.0
   apache-airflow-providers-google==10.13.1
   apache-airflow-providers-grpc==3.4.1
   apache-airflow-providers-hashicorp==3.6.1
   apache-airflow-providers-http==4.8.0
   apache-airflow-providers-imap==3.5.0
   apache-airflow-providers-microsoft-azure==8.5.1
   apache-airflow-providers-mysql==5.5.1
   apache-airflow-providers-odbc==4.4.0
   apache-airflow-providers-openlineage==1.4.0
   apache-airflow-providers-postgres==5.10.0
   apache-airflow-providers-redis==3.6.0
   apache-airflow-providers-sendgrid==3.4.0
   apache-airflow-providers-sftp==4.8.1
   apache-airflow-providers-slack==8.5.1
   apache-airflow-providers-snowflake==5.2.1
   apache-airflow-providers-sqlite==3.7.0
   apache-airflow-providers-ssh==3.10.0
   google-cloud-orchestration-airflow==1.10.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   
![Untitled](https://github.com/apache/airflow/assets/59029283/f558cb03-75c0-4b25-ac8a-ff9f2945ece5)
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] openlineage: use `ProcessPoolExecutor` over `ThreadPoolExecutor`. [airflow]

2024-04-24 Thread via GitHub


JDarDagran opened a new pull request, #39235:
URL: https://github.com/apache/airflow/pull/39235

   
   
   
   
   
   
   
   ---
   
   This may possibly fix #39232.
   
   I don't really know how to write tests for the change (load test is the 
thing but do we do this in regular tests?). Some guidance would be very much 
helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bugfix yaml parsing for GKEStartKueueInsideClusterOperator [airflow]

2024-04-24 Thread via GitHub


moiseenkov opened a new pull request, #39234:
URL: https://github.com/apache/airflow/pull/39234

   Bugfix yaml parsing for `GKEStartKueueInsideClusterOperator`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577939404


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.ERROR,

Review Comment:
   I think a better idea would just be to return the state as it is as it is 
being fetched from the cluster. And here if you look at code above its `state = 
cluster.status.state` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix batching for BigQueryToPostgresOperator [airflow]

2024-04-24 Thread via GitHub


moiseenkov opened a new pull request, #39233:
URL: https://github.com/apache/airflow/pull/39233

   Fixed batching for the following operators:
   - BigQueryToPostgresOperator
   - BigQueryToMsSqlOperator
   - BigQueryToMySqlOperator
   
   Previously, if users specify the `batch_size` parameter it is used only for 
reading from the BigQuery, however, the writing to the database is performed 
with the default batch size of 1000 rows. This PR fixes that and the specified 
`batch_size` now used for both reading from BigQuery and writing to a database.
   
   Additionally, updated the system test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] openlineage, celery: scheduler hanging when emitting lots of OL events via HTTP [airflow]

2024-04-24 Thread via GitHub


JDarDagran opened a new issue, #39232:
URL: https://github.com/apache/airflow/issues/39232

   ### Apache Airflow Provider(s)
   
   celery, openlineage
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Apache Airflow version
   
   2.9.0 but happens on 2.7+ too
   
   ### Operating System
   
   Darwin MacBook-Pro.local 23.4.0 Darwin Kernel Version 23.4.0: Fri Mar 15 
00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   The issue can be reproduced in all environments, both in local with breeze 
and cloud deployment, e.g. Astro Cloud.
   
   ### What happened
   
   OpenLineage listener hooks on DagRun state changes via 
`on_dag_run_running/failed/success`. When OL events are emitted via HTTP in 
large scale the scheduler hangs and needs restart. **The issue appears to be 
happening only with `CeleryExecutor`.**
   
   This couldn't be reproduced when disabling OpenLineage (with [openlineage] 
disabled = True) or with any other OpenLineage transport that doesn't use HTTP. 
I also experimented with using raw `urllib3` or `httpx` as alternative to 
`requests`. All of the experiments produced the same bug resulting in Scheduler 
hanging.
   
   ### What you think should happen instead
   
   When reproducing with local breeze setup with CeleryExecutor there’s this 
strange behaviour:
   
   ```htop```:
   
![image](https://github.com/apache/airflow/assets/3889552/9e006e71-cf50-42b9-bff5-0e2e72b080fa)
   
   ```lsof | grep CLOSE_WAIT```:
   
![image](https://github.com/apache/airflow/assets/3889552/03087fbf-9744-4301-905a-226b0f721a97)
   
   Stack from main loop of scheduler:
   
   ```bash
   Traceback for thread 152 (airflow) [] (most recent call last):
   (Python) File "/usr/local/bin/airflow", line 8, in 
   sys.exit(main())
   (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
   args.func(args)
   (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in 
command
   return func(*args, **kwargs)
   (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
   return f(*args, **kwargs)
   (Python) File 
"/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in 
wrapped_function
   return func(*args, **kwargs)
   (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 58, in scheduler
   run_command_with_daemon_option(
   (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 
85, in run_command_with_daemon_option
   callback()
   (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 61, in 
   callback=lambda: _run_scheduler_job(args),
   (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 49, in _run_scheduler_job
   run_job(job=job_runner.job, execute_callable=job_runner._execute)
   (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in 
wrapper
   return func(*args, session=session, **kwargs)
   (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
   return execute_job(job, execute_callable=execute_callable)
   (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in 
execute_job
   ret = execute_callable()
   (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 
847, in _execute
   self._run_scheduler_loop()
   (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 
982, in _run_scheduler_loop
   self.job.executor.heartbeat()
   (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 
240, in heartbeat
   self.trigger_tasks(open_slots)
   (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 
298, in trigger_tasks
   self._process_tasks(task_tuples)
   (Python) File 
"/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, 
in _process_tasks
   key_and_async_results = 
self._send_tasks_to_celery(task_tuples_to_send)
   (Python) File 
"/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 342, 
in _send_tasks_to_celery
   key_and_async_results = list(
   (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 484, in _chain_from_iterable_of_lists
   for element in iterable:
   (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", 
line 619, in result_iterator
   yield fs.pop().result()
   (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", 
line 439, in result
   self._condition.wait(timeout)
   (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
   waiter.acquire()
   ```
   
   Stack from one of the child spawned scheduler processes
   
   ```bash
   Traceback 

Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577943509


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.ERROR,

Review Comment:
   But that would be an inaccurate state no? Since we triggered delete action 
after that state? 
   Might be good to get the latest state or use  ClusterStatus.State.DELETING 
since we know that that would be cluster state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577943509


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.ERROR,

Review Comment:
   But that would be an inaccurate state no? Since we triggered delete action 
after that state? 
   Might be good to get the latest state or use  ClusterStatus.State.DELETING



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577939404


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""
+try:
+while True:
+cluster = await self.fetch_cluster()
+state = cluster.status.state
+if state == ClusterStatus.State.ERROR:
+await self.delete_when_error_occurred(cluster)
+yield TriggerEvent(
+{
+"cluster_name": self.cluster_name,
+"cluster_state": state.ERROR,

Review Comment:
   I think a better idea would just be to return the state as it is as it is 
being fetched from the cluster



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

2024-04-24 Thread via GitHub


sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577936986


##
airflow/providers/google/cloud/triggers/dataproc.py:
##
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 "gcp_conn_id": self.gcp_conn_id,
 "impersonation_chain": self.impersonation_chain,
 "polling_interval_seconds": self.polling_interval_seconds,
+"delete_on_error": self.delete_on_error,
 },
 )
 
 async def run(self) -> AsyncIterator[TriggerEvent]:
-while True:
-cluster = await self.get_async_hook().get_cluster(
-project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+"""Run the trigger."""

Review Comment:
   Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ECS Executor: Set tasks to RUNNING state once active [airflow]

2024-04-24 Thread via GitHub


eladkal commented on code in PR #39212:
URL: https://github.com/apache/airflow/pull/39212#discussion_r1577932047


##
airflow/providers/amazon/aws/executors/ecs/ecs_executor.py:
##
@@ -400,7 +400,7 @@ def attempt_task_runs(self):
 else:
 task = run_task_response["tasks"][0]
 self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-self.queued(task_key, task.task_arn)
+self.running_state(task_key, task.task_arn)

Review Comment:
   Does this means that the executor wont work for airflow<2.10 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >