codeant-ai-for-open-source[bot] commented on code in PR #36368: URL: https://github.com/apache/superset/pull/36368#discussion_r2748001408
########## superset/commands/tasks/prune.py: ########## @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import time +from datetime import datetime, timedelta +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from superset_core.api.tasks import TaskStatus + +from superset import db +from superset.commands.base import BaseCommand + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class TaskPruneCommand(BaseCommand): + """ + Command to prune the tasks table by deleting rows older than the specified + retention period. + + This command deletes records from the `Task` table that are in terminal states + (success, failure, aborted, or timed_out) and have not been changed within the + specified number of days. It helps in maintaining the database by removing + outdated entries and freeing up space. + + Attributes: + retention_period_days (int): The number of days for which records should be retained. + Records older than this period will be deleted. + max_rows_per_run (int | None): The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected + deterministically from the oldest first (by timestamp then id) + up to this limit in this execution. + """ # noqa: E501 + + def __init__(self, retention_period_days: int, max_rows_per_run: int | None = None): + """ + :param retention_period_days: Number of days to keep in the tasks table + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from the + oldest first (by timestamp then id) up to this limit in this execution. + """ # noqa: E501 + self.retention_period_days = retention_period_days + self.max_rows_per_run = max_rows_per_run + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs that need to be deleted + # Only delete completed tasks (success, failure, or aborted) + from superset.models.tasks import Task + + select_stmt = sa.select(Task.id).where( + Task.ended_at < datetime.now() - timedelta(days=self.retention_period_days), Review Comment: **Suggestion:** Potential timezone bug: the code uses naive `datetime.now()` to build the cutoff and compares it to `Task.ended_at`. If `Task.ended_at` is timezone-aware this can cause incorrect comparisons or DB errors; compute a timezone-aware threshold (e.g. `datetime.now(timezone.utc) - ...`) to match timezone-aware columns. [logic error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ Incorrect pruning cutoff retains old rows. - ⚠️ Scheduled pruning miscalculates retention threshold. ``` </details> ```suggestion from datetime import timezone threshold = datetime.now(timezone.utc) - timedelta(days=self.retention_period_days) select_stmt = sa.select(Task.id).where( Task.ended_at < threshold, ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Create a Task row whose `ended_at` column is timezone-aware (e.g. stored as UTC). The Task model is imported at superset/commands/tasks/prune.py:74. 2. Instantiate and run the prune command: cmd = TaskPruneCommand(retention_period_days=30) cmd.run() — run() executes the select at superset/commands/tasks/prune.py:76-86. 3. The code constructs a naive cutoff with datetime.now() at line 77 and compares it to Task.ended_at. 4. If Task.ended_at is timezone-aware (database TIMESTAMP WITH TIME ZONE), the naive vs aware comparison can lead to incorrect cutoff semantics or unexpected DB-level interpretation, causing incorrect rows to be selected. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/commands/tasks/prune.py **Line:** 76:77 **Comment:** *Logic Error: Potential timezone bug: the code uses naive `datetime.now()` to build the cutoff and compares it to `Task.ended_at`. If `Task.ended_at` is timezone-aware this can cause incorrect comparisons or DB errors; compute a timezone-aware threshold (e.g. `datetime.now(timezone.utc) - ...`) to match timezone-aware columns. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/daos/tasks.py: ########## @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Task DAO for Global Task Framework (GTF)""" + +import logging +from datetime import datetime, timezone +from typing import Any + +from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus + +from superset.daos.base import BaseDAO +from superset.daos.exceptions import DAODeleteFailedError +from superset.extensions import db +from superset.models.task_subscribers import TaskSubscriber +from superset.models.tasks import Task +from superset.tasks.filters import TaskFilter +from superset.tasks.utils import get_active_dedup_key +from superset.utils.decorators import transaction + +logger = logging.getLogger(__name__) + + +class TaskDAO(BaseDAO[Task]): + """ + Concrete TaskDAO for the Global Task Framework (GTF). + + Provides database access operations for async tasks including + creation, status management, filtering, and subscription management + for shared tasks. + """ + + base_filter = TaskFilter + + @classmethod + def find_by_task_key( + cls, + task_type: str, + task_key: str, + scope: TaskScope | str = TaskScope.PRIVATE, + user_id: int | None = None, + ) -> Task | None: + """ + Find active task by type, key, scope, and user. + + Uses dedup_key internally for efficient querying with a unique index. + Only returns tasks that are active (pending or in progress). + + Uniqueness logic by scope: + - private: scope + task_type + task_key + user_id + - shared/system: scope + task_type + task_key (user-agnostic) + + :param task_type: Task type to filter by + :param task_key: Task identifier for deduplication + :param scope: Task scope (private/shared/system) + :param user_id: User ID (required for private tasks) + :returns: Task instance or None if not found or not active + """ + dedup_key = get_active_dedup_key( + scope=scope, + task_type=task_type, + task_key=task_key, + user_id=user_id, + ) + + # Simple single-column query with unique index + return db.session.query(Task).filter(Task.dedup_key == dedup_key).one_or_none() Review Comment: **Suggestion:** Logic bug: `find_by_task_key` claims to return "active" tasks (pending or in-progress) but the query only filters by `dedup_key` and does not restrict by `status`, so it may return finished tasks that happen to share the same dedup key format; add an explicit filter for active statuses. [logic error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ Task deduplication/joining may return finished tasks. - ⚠️ SubmitTaskCommand create-vs-join logic affected (referenced in docs). ``` </details> ```suggestion # Simple single-column query with unique index, restrict to active statuses return ( db.session.query(Task) .filter( Task.dedup_key == dedup_key, Task.status.in_([TaskStatus.PENDING.value, TaskStatus.IN_PROGRESS.value]), ) .one_or_none() ) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Create a test or run a Python REPL that imports TaskDAO from superset/daos/tasks.py and Task from superset/models/tasks.py. 2. Insert a Task row manually via ORM in the DB with dedup_key equal to the active format (e.g., call TaskDAO.create_task(...) at superset/daos/tasks.py:84 to create a task, then set its status to a finished value like TaskStatus.SUCCESS.value and (intentionally) leave dedup_key unchanged). 3. Call TaskDAO.find_by_task_key(task_type, task_key, scope, user_id) (defined at superset/daos/tasks.py:48-55, query at lines 79-80). 4. Observe that find_by_task_key returns the record even though its status is finished, because the current code (superset/daos/tasks.py:79-80) only filters by dedup_key and not by status. This demonstrates the reported logic gap. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/daos/tasks.py **Line:** 79:80 **Comment:** *Logic Error: Logic bug: `find_by_task_key` claims to return "active" tasks (pending or in-progress) but the query only filters by `dedup_key` and does not restrict by `status`, so it may return finished tasks that happen to share the same dedup key format; add an explicit filter for active statuses. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/commands/tasks/prune.py: ########## @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import time +from datetime import datetime, timedelta +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from superset_core.api.tasks import TaskStatus + +from superset import db +from superset.commands.base import BaseCommand + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class TaskPruneCommand(BaseCommand): + """ + Command to prune the tasks table by deleting rows older than the specified + retention period. + + This command deletes records from the `Task` table that are in terminal states + (success, failure, aborted, or timed_out) and have not been changed within the + specified number of days. It helps in maintaining the database by removing + outdated entries and freeing up space. + + Attributes: + retention_period_days (int): The number of days for which records should be retained. + Records older than this period will be deleted. + max_rows_per_run (int | None): The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected + deterministically from the oldest first (by timestamp then id) + up to this limit in this execution. + """ # noqa: E501 + + def __init__(self, retention_period_days: int, max_rows_per_run: int | None = None): + """ + :param retention_period_days: Number of days to keep in the tasks table + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from the + oldest first (by timestamp then id) up to this limit in this execution. + """ # noqa: E501 + self.retention_period_days = retention_period_days + self.max_rows_per_run = max_rows_per_run + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs that need to be deleted + # Only delete completed tasks (success, failure, or aborted) + from superset.models.tasks import Task + + select_stmt = sa.select(Task.id).where( + Task.ended_at < datetime.now() - timedelta(days=self.retention_period_days), + Task.status.in_( + [ + TaskStatus.SUCCESS.value, + TaskStatus.FAILURE.value, + TaskStatus.ABORTED.value, + TaskStatus.TIMED_OUT.value, + ] + ), + ) + + # Optionally limited by max_rows_per_run + # order by oldest first for deterministic deletion + if self.max_rows_per_run is not None and self.max_rows_per_run > 0: + select_stmt = select_stmt.order_by( + Task.ended_at.asc(), Task.id.asc() + ).limit(self.max_rows_per_run) + + ids_to_delete = db.session.execute(select_stmt).scalars().all() + + total_rows = len(ids_to_delete) + + logger.info("Total rows to be deleted: %s", f"{total_rows:,}") + + next_logging_threshold = 1 + + # Iterate over the IDs in batches + for i in range(0, total_rows, batch_size): + batch_ids = ids_to_delete[i : i + batch_size] + + # Delete the selected batch using IN clause + result = db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids))) + + # Update the total number of deleted records + total_deleted += result.rowcount + + # Explicitly commit the transaction given that if an error occurs, we want to ensure that the # noqa: E501 + # records that have been deleted so far are committed + db.session.commit() + + # Log the number of deleted records every 1% increase in progress + percentage_complete = (total_deleted / total_rows) * 100 + if percentage_complete >= next_logging_threshold: + logger.info( + "Deleted %s rows from the tasks table older than %s days (%d%% complete)", # noqa: E501 + f"{total_deleted:,}", + self.retention_period_days, + percentage_complete, + ) + next_logging_threshold += 1 + + elapsed_time = time.time() - start_time + minutes, seconds = divmod(elapsed_time, 60) + formatted_time = f"{int(minutes):02}:{int(seconds):02}" + logger.info( + "Pruning complete: %s rows deleted in %s", + f"{total_deleted:,}", + formatted_time, + ) + + def validate(self) -> None: + pass Review Comment: **Suggestion:** Missing validation: `validate()` is a no-op and `retention_period_days` (or `max_rows_per_run`) could be set to invalid values (negative or non-integer), which can cause unintended mass deletions; implement checks to ensure `retention_period_days` is a non-negative integer and `max_rows_per_run` is either None or a positive integer. [logic error] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Misconfiguration can delete almost all task rows. - ⚠️ Admin-scheduled prune job becomes destructive. ``` </details> ```suggestion # retention_period_days must be a non-negative integer if not isinstance(self.retention_period_days, int) or self.retention_period_days < 0: raise ValueError("retention_period_days must be a non-negative integer") # max_rows_per_run must be None or a positive integer if self.max_rows_per_run is not None and ( not isinstance(self.max_rows_per_run, int) or self.max_rows_per_run <= 0 ): raise ValueError("max_rows_per_run must be a positive integer or None") ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Instantiate the command with an invalid retention, e.g. TaskPruneCommand(retention_period_days=-1) (constructor at superset/commands/tasks/prune.py:54-63). 2. Call cmd.run() (method defined at superset/commands/tasks/prune.py:64). The select cutoff is computed at superset/commands/tasks/prune.py:76 as datetime.now() - timedelta(days=self.retention_period_days). 3. With retention_period_days = -1 the cutoff becomes datetime.now() + 1 day, so the WHERE clause at lines 76-86 matches almost all rows (Task.ended_at < tomorrow). 4. The loop at lines 104-126 will therefore delete a far larger set of Task rows than intended (potentially all), demonstrating the missing validation leads to dangerous mass deletions. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/commands/tasks/prune.py **Line:** 138:138 **Comment:** *Logic Error: Missing validation: `validate()` is a no-op and `retention_period_days` (or `max_rows_per_run`) could be set to invalid values (negative or non-integer), which can cause unintended mass deletions; implement checks to ensure `retention_period_days` is a non-negative integer and `max_rows_per_run` is either None or a positive integer. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## tests/unit_tests/daos/test_tasks.py: ########## @@ -0,0 +1,266 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import MagicMock, patch + +import pytest +from superset_core.api.tasks import TaskStatus + +from superset.daos.tasks import TaskDAO +from superset.models.tasks import Task + + +class TestTaskDAO: + """Test TaskDAO functionality""" + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_active(self, mock_session, mock_get_user_id): + """Test finding active task by task_key""" + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.task_key = "test-id" + mock_task.task_type = "test_type" + mock_task.status = TaskStatus.PENDING.value + + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = mock_task + + result = TaskDAO.find_by_task_key("test_type", "test-key") + + assert result == mock_task + mock_session.query.assert_called_once_with(Task) + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_not_found(self, mock_session, mock_get_user_id): + """Test finding task by task_key returns None when not found""" + mock_get_user_id.return_value = 1 + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = None + + result = TaskDAO.find_by_task_key("test_type", "nonexistent-key") + + assert result is None + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_ignores_finished_tasks( + self, mock_session, mock_get_user_id + ): + """Test that find_by_task_key only returns pending/in-progress tasks""" + mock_get_user_id.return_value = 1 + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = None + + # Should not find SUCCESS task + result = TaskDAO.find_by_task_key("test_type", "finished-key") + assert result is None + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_success(self, mock_session, mock_create, mock_get_user_id): + """Test successful task creation. + + TaskDAO.create_task is now a pure data operation - it assumes the caller + (SubmitTaskCommand) has already checked for duplicates and holds the lock. + """ + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + ) + + assert result == mock_task + mock_create.assert_called_once() + mock_session.commit.assert_called_once() + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_with_user_id( + self, mock_session, mock_create, mock_get_user_id + ): + """Test task creation with explicit user_id. + + TaskDAO.create_task should use the provided user_id for the task. + """ + mock_get_user_id.return_value = 99 # Different from provided user_id + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + user_id=42, # Explicit user_id + ) + + assert result == mock_task + # Verify create was called with user_id in attributes + call_args = mock_create.call_args + assert call_args[1]["attributes"]["user_id"] == 42 + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_with_properties( + self, mock_session, mock_create, mock_get_user_id + ): + """Test task creation with properties. + + Properties are set via update_properties() after task creation. + """ + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + properties={"timeout": 300}, + ) + + assert result == mock_task + mock_task.update_properties.assert_called_once_with({"timeout": 300}) + + @patch("superset.daos.tasks.TaskDAO.find_one_or_none") + @patch("superset.daos.tasks.db.session") + def test_abort_task_pending_success(self, mock_session, mock_find): + """Test successful abort of pending task - goes directly to ABORTED""" + mock_task = MagicMock(spec=Task) + mock_task.status = TaskStatus.PENDING.value + mock_task.is_shared = False + mock_task.subscriber_count = 0 + mock_find.return_value = mock_task + + result = TaskDAO.abort_task("test-uuid") + + # Now returns Task instead of bool + assert result is mock_task + mock_task.set_status.assert_called_once_with(TaskStatus.ABORTED) + mock_session.commit.assert_called_once() + + @patch("superset.daos.tasks.TaskDAO.find_one_or_none") + @patch("superset.daos.tasks.db.session") + def test_abort_task_in_progress_abortable(self, mock_session, mock_find): + """Test abort of in-progress task with abort handler. + + Should transition to ABORTING status. + """ + mock_task = MagicMock(spec=Task) + mock_task.status = TaskStatus.IN_PROGRESS.value + mock_task.properties = {"is_abortable": True} # Dict, not MagicMock + mock_task.is_shared = False + mock_task.subscriber_count = 0 + mock_find.return_value = mock_task + + result = TaskDAO.abort_task("test-uuid") + + # Now returns Task instead of bool + assert result is mock_task + # Should set status to ABORTING, not ABORTED + assert mock_task.status == TaskStatus.ABORTING.value + mock_session.merge.assert_called_once_with(mock_task) + mock_session.commit.assert_called_once() Review Comment: **Suggestion:** The test for aborting an in-progress abortable task expects a commit after transitioning to ABORTING, but `TaskDAO.abort_task` only merges the task and returns without committing; change the commit assertion to `assert_not_called()` to match the implementation. [possible bug] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Unit test fails in tests/unit_tests/daos/test_tasks.py. - ⚠️ CI run will be blocked by this failing test. ``` </details> ```suggestion mock_session.commit.assert_not_called() ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Run the test: pytest tests/unit_tests/daos/test_tasks.py::TestTaskDAO::test_abort_task_in_progress_abortable. The test invokes TaskDAO.abort_task at tests/unit_tests/daos/test_tasks.py:193 and asserts commit at line 200. 2. The abort path under test is IN_PROGRESS and abortable; execution goes to superset/daos/tasks.py:171-233 (abort_task). There the code sets task.status = TaskStatus.ABORTING.value and calls db.session.merge(task), then returns — it does not call db.session.commit() in that code path. 3. Because the test patches superset.daos.tasks.db.session, mock_session.merge will be called (tests assert that), but mock_session.commit will not. The test's assertion at tests/unit_tests/daos/test_tasks.py:200 expecting commit therefore fails. 4. Change the test to assert commit was not called (or intentionally add a commit in the DAO) to align test expectations with the implemented abort_task behavior. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** tests/unit_tests/daos/test_tasks.py **Line:** 200:200 **Comment:** *Possible Bug: The test for aborting an in-progress abortable task expects a commit after transitioning to ABORTING, but `TaskDAO.abort_task` only merges the task and returns without committing; change the commit assertion to `assert_not_called()` to match the implementation. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/daos/tasks.py: ########## @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Task DAO for Global Task Framework (GTF)""" + +import logging +from datetime import datetime, timezone +from typing import Any + +from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus + +from superset.daos.base import BaseDAO +from superset.daos.exceptions import DAODeleteFailedError +from superset.extensions import db +from superset.models.task_subscribers import TaskSubscriber +from superset.models.tasks import Task +from superset.tasks.filters import TaskFilter +from superset.tasks.utils import get_active_dedup_key +from superset.utils.decorators import transaction + +logger = logging.getLogger(__name__) + + +class TaskDAO(BaseDAO[Task]): + """ + Concrete TaskDAO for the Global Task Framework (GTF). + + Provides database access operations for async tasks including + creation, status management, filtering, and subscription management + for shared tasks. + """ + + base_filter = TaskFilter + + @classmethod + def find_by_task_key( + cls, + task_type: str, + task_key: str, + scope: TaskScope | str = TaskScope.PRIVATE, + user_id: int | None = None, + ) -> Task | None: + """ + Find active task by type, key, scope, and user. + + Uses dedup_key internally for efficient querying with a unique index. + Only returns tasks that are active (pending or in progress). + + Uniqueness logic by scope: + - private: scope + task_type + task_key + user_id + - shared/system: scope + task_type + task_key (user-agnostic) + + :param task_type: Task type to filter by + :param task_key: Task identifier for deduplication + :param scope: Task scope (private/shared/system) + :param user_id: User ID (required for private tasks) + :returns: Task instance or None if not found or not active + """ + dedup_key = get_active_dedup_key( + scope=scope, + task_type=task_type, + task_key=task_key, + user_id=user_id, + ) + + # Simple single-column query with unique index + return db.session.query(Task).filter(Task.dedup_key == dedup_key).one_or_none() + + @classmethod + @transaction() + def create_task( + cls, + task_type: str, + task_key: str, + scope: TaskScope | str = TaskScope.PRIVATE, + user_id: int | None = None, + payload: dict[str, Any] | None = None, + properties: TaskProperties | None = None, + **kwargs: Any, + ) -> Task: + """ + Create a new task record in the database. + + This is a pure data operation - assumes caller holds lock and has + already checked for existing tasks. Business logic (create vs join) + is handled by SubmitTaskCommand. + + :param task_type: Type of task to create + :param task_key: Task identifier (required) + :param scope: Task scope (private/shared/system), defaults to private + :param user_id: User ID creating the task + :param payload: Optional user-defined context data (dict) + :param properties: Optional framework-managed runtime state (e.g., timeout) + :param kwargs: Additional task attributes (e.g., task_name) + :returns: Created Task instance + """ + # Build dedup_key for active task + dedup_key = get_active_dedup_key( + scope=scope, + task_type=task_type, + task_key=task_key, + user_id=user_id, + ) + + # Handle both TaskScope enum and string values + scope_value = scope.value if isinstance(scope, TaskScope) else scope + + # Note: properties is handled separately via update_properties() + # because it's a hybrid property with only a getter + task_data = { + "task_type": task_type, + "task_key": task_key, + "scope": scope_value, + "status": TaskStatus.PENDING.value, + "dedup_key": dedup_key, + **kwargs, + } + + # Handle payload - serialize to JSON if dict provided + if payload: Review Comment: **Suggestion:** Bug with falsy payloads: the code uses `if payload:` which skips serializing valid but falsy payloads (e.g., empty dict or empty list). Use an explicit `is not None` check to preserve intentionally empty payloads. [logic error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ Intentionally empty task payloads are not persisted. - ⚠️ Downstream task consumers miss expected context. ``` </details> ```suggestion # Handle payload - serialize to JSON if dict provided (allow empty payloads) if payload is not None: ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. In a test or integration scenario call TaskDAO.create_task(...) at superset/daos/tasks.py:84 with payload={} (explicit empty dict). 2. The create_task function execution reaches the payload handling block at superset/daos/tasks.py:132-136. 3. With the current condition `if payload:` (superset/daos/tasks.py:133), an empty dict evaluates to False and the code path that sets task_data["payload"] is skipped. 4. After insertion, inspect the Task row in the DB or task.payload attribute and observe the payload field is not set, demonstrating loss of intentionally provided empty payloads. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/daos/tasks.py **Line:** 132:133 **Comment:** *Logic Error: Bug with falsy payloads: the code uses `if payload:` which skips serializing valid but falsy payloads (e.g., empty dict or empty list). Use an explicit `is not None` check to preserve intentionally empty payloads. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## tests/unit_tests/daos/test_tasks.py: ########## @@ -0,0 +1,266 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import MagicMock, patch + +import pytest +from superset_core.api.tasks import TaskStatus + +from superset.daos.tasks import TaskDAO +from superset.models.tasks import Task + + +class TestTaskDAO: + """Test TaskDAO functionality""" + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_active(self, mock_session, mock_get_user_id): + """Test finding active task by task_key""" + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.task_key = "test-id" + mock_task.task_type = "test_type" + mock_task.status = TaskStatus.PENDING.value + + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = mock_task + + result = TaskDAO.find_by_task_key("test_type", "test-key") + + assert result == mock_task + mock_session.query.assert_called_once_with(Task) + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_not_found(self, mock_session, mock_get_user_id): + """Test finding task by task_key returns None when not found""" + mock_get_user_id.return_value = 1 + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = None + + result = TaskDAO.find_by_task_key("test_type", "nonexistent-key") + + assert result is None + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_ignores_finished_tasks( + self, mock_session, mock_get_user_id + ): + """Test that find_by_task_key only returns pending/in-progress tasks""" + mock_get_user_id.return_value = 1 + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = None + + # Should not find SUCCESS task + result = TaskDAO.find_by_task_key("test_type", "finished-key") + assert result is None + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_success(self, mock_session, mock_create, mock_get_user_id): + """Test successful task creation. + + TaskDAO.create_task is now a pure data operation - it assumes the caller + (SubmitTaskCommand) has already checked for duplicates and holds the lock. + """ + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + ) + + assert result == mock_task + mock_create.assert_called_once() + mock_session.commit.assert_called_once() + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_with_user_id( + self, mock_session, mock_create, mock_get_user_id + ): + """Test task creation with explicit user_id. + + TaskDAO.create_task should use the provided user_id for the task. + """ + mock_get_user_id.return_value = 99 # Different from provided user_id + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + user_id=42, # Explicit user_id + ) + + assert result == mock_task + # Verify create was called with user_id in attributes + call_args = mock_create.call_args + assert call_args[1]["attributes"]["user_id"] == 42 + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_with_properties( + self, mock_session, mock_create, mock_get_user_id + ): + """Test task creation with properties. + + Properties are set via update_properties() after task creation. + """ + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + properties={"timeout": 300}, + ) + + assert result == mock_task + mock_task.update_properties.assert_called_once_with({"timeout": 300}) + + @patch("superset.daos.tasks.TaskDAO.find_one_or_none") + @patch("superset.daos.tasks.db.session") + def test_abort_task_pending_success(self, mock_session, mock_find): + """Test successful abort of pending task - goes directly to ABORTED""" + mock_task = MagicMock(spec=Task) + mock_task.status = TaskStatus.PENDING.value + mock_task.is_shared = False + mock_task.subscriber_count = 0 + mock_find.return_value = mock_task + + result = TaskDAO.abort_task("test-uuid") + + # Now returns Task instead of bool + assert result is mock_task + mock_task.set_status.assert_called_once_with(TaskStatus.ABORTED) + mock_session.commit.assert_called_once() Review Comment: **Suggestion:** The test for aborting a pending task asserts that `db.session.commit()` was called, but `TaskDAO.abort_task` for PENDING tasks only sets the status and returns without committing; change the assertion to assert the commit was not called. [possible bug] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Unit test fails in tests/unit_tests/daos/test_tasks.py. - ⚠️ CI run will be blocked by this failing test. ``` </details> ```suggestion mock_session.commit.assert_not_called() ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Run the test: pytest tests/unit_tests/daos/test_tasks.py::TestTaskDAO::test_abort_task_pending_success. The test calls TaskDAO.abort_task at tests/unit_tests/daos/test_tasks.py:172 and asserts commit at line 177. 2. The test patches TaskDAO.find_one_or_none and db.session; TaskDAO.abort_task is implemented in superset/daos/tasks.py:171-233 (function abort_task). 3. In abort_task (superset/daos/tasks.py:171-233) the PENDING branch sets task.set_status(TaskStatus.ABORTED) and returns the task, but there is no db.session.commit() call in that code path — only logging and status change. 4. Therefore the test's expectation that mock_session.commit was called (tests line 177) is incorrect; asserting commit was not called (or adjusting the DAO to commit intentionally) reproduces and resolves the mismatch. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** tests/unit_tests/daos/test_tasks.py **Line:** 177:177 **Comment:** *Possible Bug: The test for aborting a pending task asserts that `db.session.commit()` was called, but `TaskDAO.abort_task` for PENDING tasks only sets the status and returns without committing; change the assertion to assert the commit was not called. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## tests/unit_tests/daos/test_tasks.py: ########## @@ -0,0 +1,266 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import MagicMock, patch + +import pytest +from superset_core.api.tasks import TaskStatus + +from superset.daos.tasks import TaskDAO +from superset.models.tasks import Task + + +class TestTaskDAO: + """Test TaskDAO functionality""" + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_active(self, mock_session, mock_get_user_id): + """Test finding active task by task_key""" + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.task_key = "test-id" + mock_task.task_type = "test_type" + mock_task.status = TaskStatus.PENDING.value + + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = mock_task + + result = TaskDAO.find_by_task_key("test_type", "test-key") + + assert result == mock_task + mock_session.query.assert_called_once_with(Task) + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_not_found(self, mock_session, mock_get_user_id): + """Test finding task by task_key returns None when not found""" + mock_get_user_id.return_value = 1 + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = None + + result = TaskDAO.find_by_task_key("test_type", "nonexistent-key") + + assert result is None + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.db.session") + def test_find_by_task_key_ignores_finished_tasks( + self, mock_session, mock_get_user_id + ): + """Test that find_by_task_key only returns pending/in-progress tasks""" + mock_get_user_id.return_value = 1 + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_filter = MagicMock() + mock_query.filter.return_value = mock_filter + mock_filter.one_or_none.return_value = None + + # Should not find SUCCESS task + result = TaskDAO.find_by_task_key("test_type", "finished-key") + assert result is None + + @patch("superset.utils.core.get_user_id") + @patch("superset.daos.tasks.TaskDAO.create") + @patch("superset.daos.tasks.db.session") + def test_create_task_success(self, mock_session, mock_create, mock_get_user_id): + """Test successful task creation. + + TaskDAO.create_task is now a pure data operation - it assumes the caller + (SubmitTaskCommand) has already checked for duplicates and holds the lock. + """ + mock_get_user_id.return_value = 1 + mock_task = MagicMock(spec=Task) + mock_task.id = 1 + mock_task.task_key = "new-task" + mock_task.task_type = "test_type" + mock_create.return_value = mock_task + + result = TaskDAO.create_task( + task_type="test_type", + task_key="new-task", + ) + + assert result == mock_task + mock_create.assert_called_once() + mock_session.commit.assert_called_once() Review Comment: **Suggestion:** The test incorrectly expects a DB transaction commit from `TaskDAO.create_task`, but the implementation only calls `db.session.flush()` (no commit). Replace the failing `commit` assertion with a check for `flush` to match the DAO behavior. [possible bug] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Unit test fails in tests/unit_tests/daos/test_tasks.py. - ⚠️ CI run will be blocked by this failing test. ``` </details> ```suggestion mock_session.flush.assert_called_once() ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Run the single unit test: pytest tests/unit_tests/daos/test_tasks.py::TestTaskDAO::test_create_task_success. The test file line invoking the DAO is at tests/unit_tests/daos/test_tasks.py:99 (result = TaskDAO.create_task(...)) and the failing assertion is at line 106 (mock_session.commit.assert_called_once()). 2. The test patches the DB session via @patch("superset.daos.tasks.db.session"), so the create_task call in tests routes to superset/daos/tasks.py:create_task (function span superset/daos/tasks.py:84-167). 3. Inspect superset/daos/tasks.py:create_task (lines 84-167) — the function calls db.session.flush() (to obtain the new task id) but does not call db.session.commit(). This is the implemented behavior. 4. Because the test asserts commit at tests/unit_tests/daos/test_tasks.py:106 but the DAO only flushes, the assertion fails. Updating the test to assert flush (or remove the commit assertion) matches the actual implementation. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** tests/unit_tests/daos/test_tasks.py **Line:** 106:106 **Comment:** *Possible Bug: The test incorrectly expects a DB transaction commit from `TaskDAO.create_task`, but the implementation only calls `db.session.flush()` (no commit). Replace the failing `commit` assertion with a check for `flush` to match the DAO behavior. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/commands/tasks/prune.py: ########## @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import time +from datetime import datetime, timedelta +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from superset_core.api.tasks import TaskStatus + +from superset import db +from superset.commands.base import BaseCommand + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class TaskPruneCommand(BaseCommand): + """ + Command to prune the tasks table by deleting rows older than the specified + retention period. + + This command deletes records from the `Task` table that are in terminal states + (success, failure, aborted, or timed_out) and have not been changed within the + specified number of days. It helps in maintaining the database by removing + outdated entries and freeing up space. + + Attributes: + retention_period_days (int): The number of days for which records should be retained. + Records older than this period will be deleted. + max_rows_per_run (int | None): The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected + deterministically from the oldest first (by timestamp then id) + up to this limit in this execution. + """ # noqa: E501 + + def __init__(self, retention_period_days: int, max_rows_per_run: int | None = None): + """ + :param retention_period_days: Number of days to keep in the tasks table + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from the + oldest first (by timestamp then id) up to this limit in this execution. + """ # noqa: E501 + self.retention_period_days = retention_period_days + self.max_rows_per_run = max_rows_per_run + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs that need to be deleted + # Only delete completed tasks (success, failure, or aborted) + from superset.models.tasks import Task + + select_stmt = sa.select(Task.id).where( + Task.ended_at < datetime.now() - timedelta(days=self.retention_period_days), + Task.status.in_( + [ + TaskStatus.SUCCESS.value, + TaskStatus.FAILURE.value, + TaskStatus.ABORTED.value, + TaskStatus.TIMED_OUT.value, + ] + ), + ) + + # Optionally limited by max_rows_per_run + # order by oldest first for deterministic deletion + if self.max_rows_per_run is not None and self.max_rows_per_run > 0: + select_stmt = select_stmt.order_by( + Task.ended_at.asc(), Task.id.asc() + ).limit(self.max_rows_per_run) + + ids_to_delete = db.session.execute(select_stmt).scalars().all() + + total_rows = len(ids_to_delete) + + logger.info("Total rows to be deleted: %s", f"{total_rows:,}") + + next_logging_threshold = 1 + + # Iterate over the IDs in batches + for i in range(0, total_rows, batch_size): + batch_ids = ids_to_delete[i : i + batch_size] + + # Delete the selected batch using IN clause + result = db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids))) + + # Update the total number of deleted records + total_deleted += result.rowcount + + # Explicitly commit the transaction given that if an error occurs, we want to ensure that the # noqa: E501 + # records that have been deleted so far are committed + db.session.commit() + + # Log the number of deleted records every 1% increase in progress + percentage_complete = (total_deleted / total_rows) * 100 + if percentage_complete >= next_logging_threshold: + logger.info( + "Deleted %s rows from the tasks table older than %s days (%d%% complete)", # noqa: E501 + f"{total_deleted:,}", + self.retention_period_days, + percentage_complete, + ) + next_logging_threshold += 1 Review Comment: **Suggestion:** Resource exhaustion: the code loads all matching task IDs into memory via `.scalars().all()` which can OOM on large tables; instead stream IDs from the DB and delete in batches (use a count query for progress, iterate over `Result.scalars()` and flush batches) to avoid building a huge list. [memory leak] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Prune command may OOM preventing DB cleanup. - ⚠️ Scheduled maintenance tasks may fail (TaskPruneCommand). - ⚠️ Database storage growth due to retained rows. ``` </details> ```suggestion # Compute total_rows efficiently (respect max_rows_per_run if provided) count_stmt = sa.select(sa.func.count()).select_from(Task).where( Task.ended_at < datetime.now() - timedelta(days=self.retention_period_days), Task.status.in_( [ TaskStatus.SUCCESS.value, TaskStatus.FAILURE.value, TaskStatus.ABORTED.value, TaskStatus.TIMED_OUT.value, ] ), ) total_rows = db.session.execute(count_stmt).scalar() or 0 if self.max_rows_per_run is not None and self.max_rows_per_run > 0: total_rows = min(total_rows, self.max_rows_per_run) logger.info("Total rows to be deleted: %s", f"{total_rows:,}") next_logging_threshold = 1 # Stream IDs and delete in batches to avoid loading all IDs into memory result = db.session.execute(select_stmt) batch_ids: list[int] = [] for task_id in result.scalars(): batch_ids.append(task_id) if len(batch_ids) >= batch_size: res = db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids))) deleted = res.rowcount or 0 total_deleted += deleted db.session.commit() percentage_complete = (total_deleted / total_rows) * 100 if total_rows else 100 if percentage_complete >= next_logging_threshold: logger.info( "Deleted %s rows from the tasks table older than %s days (%.2f%% complete)", f"{total_deleted:,}", self.retention_period_days, percentage_complete, ) next_logging_threshold += 1 batch_ids = [] # Final batch if any if batch_ids: res = db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids))) deleted = res.rowcount or 0 total_deleted += deleted db.session.commit() percentage_complete = (total_deleted / total_rows) * 100 if total_rows else 100 if percentage_complete >= next_logging_threshold: logger.info( "Deleted %s rows from the tasks table older than %s days (%.2f%% complete)", f"{total_deleted:,}", self.retention_period_days, percentage_complete, ) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Populate the tasks table with a large number of terminal Task rows (e.g. >100k). Insert rows into the Task model referenced by `from superset.models.tasks import Task` (see superset/commands/tasks/prune.py:74 where Task is imported). 2. In a Python process, import and run the command directly: from superset.commands.tasks.prune import TaskPruneCommand cmd = TaskPruneCommand(retention_period_days=1) cmd.run() (run() is defined at superset/commands/tasks/prune.py:64 and will execute the code at lines 95-126). 3. When execution reaches line 95 (`ids_to_delete = db.session.execute(select_stmt).scalars().all()`) the code will materialize all matching IDs into memory. 4. Observe process memory growth and eventual OOM / memory pressure when the result set is very large, causing the prune run to crash or the host to kill the process. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/commands/tasks/prune.py **Line:** 95:126 **Comment:** *Memory Leak: Resource exhaustion: the code loads all matching task IDs into memory via `.scalars().all()` which can OOM on large tables; instead stream IDs from the DB and delete in batches (use a count query for progress, iterate over `Result.scalars()` and flush batches) to avoid building a huge list. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/commands/tasks/cancel.py: ########## @@ -0,0 +1,301 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Unified cancel task command for GTF.""" + +import logging +from functools import partial +from typing import TYPE_CHECKING + +from superset_core.api.tasks import TaskScope, TaskStatus + +from superset.commands.base import BaseCommand +from superset.commands.tasks.exceptions import ( + TaskAbortFailedError, + TaskNotAbortableError, + TaskNotFoundError, + TaskPermissionDeniedError, +) +from superset.extensions import security_manager +from superset.tasks.locks import task_lock +from superset.tasks.utils import get_active_dedup_key +from superset.utils.core import get_user_id +from superset.utils.decorators import on_error, transaction + +if TYPE_CHECKING: + from superset.models.tasks import Task + +logger = logging.getLogger(__name__) + + +class CancelTaskCommand(BaseCommand): + """ + Unified command to cancel a task. + + Behavior: + - For private tasks or single-subscriber tasks: aborts the task + - For shared tasks with multiple subscribers (non-admin): unsubscribes user + - For shared tasks with force=True (admin only): aborts for all subscribers + + The term "cancel" is user-facing; internally this may abort or unsubscribe. + + This command acquires a distributed lock before starting a transaction to + prevent race conditions with concurrent submit/cancel operations. + + Permission checks are deferred to inside the lock to minimize SELECTs: + we only fetch the task once, then validate permissions on the fetched data. + """ + + def __init__(self, task_uuid: str, force: bool = False): + """ + Initialize the cancel command. + + :param task_uuid: UUID of the task to cancel + :param force: If True, force abort even with multiple subscribers (admin only) + """ + self._task_uuid = task_uuid + self._force = force + self._action_taken: str = ( + "cancelled" # Will be set to 'aborted' or 'unsubscribed' + ) + self._should_publish_abort: bool = False + + def run(self) -> "Task": + """ + Execute the cancel command with distributed locking. + + The lock is acquired BEFORE starting the transaction to avoid holding + a DB connection during lock acquisition. Uses dedup_key as lock key + to ensure Submit and Cancel operations use the same lock. + + :returns: The updated task model + """ + from superset.daos.tasks import TaskDAO + + # Lightweight fetch to compute dedup_key for locking + # This is needed to use the same lock key as SubmitTaskCommand + task = TaskDAO.find_one_or_none( + skip_base_filter=security_manager.is_admin(), uuid=self._task_uuid + ) + + if not task: + raise TaskNotFoundError() + + # Compute dedup_key using the same logic as SubmitTaskCommand + dedup_key = get_active_dedup_key( + scope=task.scope, + task_type=task.task_type, + task_key=task.task_key, + user_id=task.user_id if task.is_private else None, + ) + + # Acquire lock BEFORE transaction starts + # Using dedup_key ensures Submit and Cancel use the same lock + with task_lock(dedup_key): + result = self._execute_with_transaction() + + # Publish abort notification AFTER transaction commits + # This prevents race conditions where listeners check DB before commit + if self._should_publish_abort: + from superset.tasks.manager import TaskManager + + TaskManager.publish_abort(self._task_uuid) + + return result + + @transaction(on_error=partial(on_error, reraise=TaskAbortFailedError)) + def _execute_with_transaction(self) -> "Task": + """ + Execute the cancel operation inside a transaction. + + Combines fetch + validation + execution in a single transaction, + reducing the number of SELECTs from 3 to 1 (plus DAO operations). + + :returns: The updated task model + """ + from superset.daos.tasks import TaskDAO + + # Check admin status (no DB access) + is_admin = security_manager.is_admin() + + # Force flag requires admin + if self._force and not is_admin: + raise TaskPermissionDeniedError( + "Only administrators can force cancel a task" + ) + + # Single SELECT: fetch task and validate permissions on it + task = TaskDAO.find_one_or_none(skip_base_filter=is_admin, uuid=self._task_uuid) + + if not task: + raise TaskNotFoundError() + + # Validate permissions on the fetched task + self._validate_permissions(task, is_admin) + + # Execute cancel and return updated task + return self._do_cancel(task, is_admin) + + def _validate_permissions(self, task: "Task", is_admin: bool) -> None: + """ + Validate permissions on an already-fetched task. + + Permission rules by scope: + - private: Only creator or admin (already filtered by base_filter) + - shared: Subscribers or admin + - system: Only admin + + :param task: The task to validate permissions for + :param is_admin: Whether current user is admin + :raises TaskAbortFailedError: If task is not in cancellable state + :raises TaskPermissionDeniedError: If user lacks permission + """ + # Check if task is in a cancellable state + if task.status not in [ + TaskStatus.PENDING.value, + TaskStatus.IN_PROGRESS.value, + TaskStatus.ABORTING.value, # Already aborting is OK (idempotent) + ]: + raise TaskAbortFailedError() + + # Admin can cancel anything + if is_admin: + return + + # Non-admin permission checks by scope + user_id = get_user_id() + + if task.scope == TaskScope.SYSTEM.value: Review Comment: **Suggestion:** Inconsistent TaskScope comparison: the code compares `task.scope` to `TaskScope.SYSTEM.value` but `task.scope` may be stored as either the enum member or a string; comparing to `.value` will miss matches when `task.scope` is the enum and allow unauthorized non-admin cancellations of system tasks. Normalize the scope to the enum before comparing (e.g., `TaskScope(task.scope) == TaskScope.SYSTEM`). [logic error] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Unauthorized cancellation of system tasks. - ⚠️ CancelTaskCommand permission enforcement compromised. - ⚠️ Task abort audit/integrity may be violated. ``` </details> ```suggestion # Normalize scope to TaskScope enum to support either stored string or enum if TaskScope(task.scope) == TaskScope.SYSTEM: ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Ensure a Task row exists such that TaskDAO.find_one_or_none() returns a model where `task.scope` is a TaskScope enum instance (not a string). The Cancel flow starts at `superset/commands/tasks/cancel.py:76` (def run) which fetches the task via `TaskDAO.find_one_or_none(...)` at lines `86-92`. 2. As a non-admin user, call CancelTaskCommand.run() (instantiate CancelTaskCommand and invoke run) so execution reaches `_execute_with_transaction()` at `superset/commands/tasks/cancel.py:119` and then `_validate_permissions()` at `superset/commands/tasks/cancel.py:152`. 3. Inside `_validate_permissions()` the code at lines `181-185` performs `if task.scope == TaskScope.SYSTEM.value:`. If `task.scope` is a TaskScope enum instance, this equality check will be False (enum != string), so the admin-only branch is skipped. 4. Because the system-scope guard was missed, the command proceeds to subsequent non-admin checks (subscriber checks at `superset/commands/tasks/cancel.py:187-192`) and may allow a non-admin to continue to `_do_cancel()` and abort/unsubscribe a system task, producing an unauthorized cancellation. This demonstrates the permission-check bypass. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/commands/tasks/cancel.py **Line:** 181:181 **Comment:** *Logic Error: Inconsistent TaskScope comparison: the code compares `task.scope` to `TaskScope.SYSTEM.value` but `task.scope` may be stored as either the enum member or a string; comparing to `.value` will miss matches when `task.scope` is the enum and allow unauthorized non-admin cancellations of system tasks. Normalize the scope to the enum before comparing (e.g., `TaskScope(task.scope) == TaskScope.SYSTEM`). Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/migrations/versions/2025_12_18_0220_create_tasks_table.py: ########## @@ -0,0 +1,217 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Create tasks and task_subscriber tables for Global Task Framework (GTF) + +Revision ID: 4b2a8c9d3e1f +Revises: 9787190b3d89 +Create Date: 2025-12-18 02:20:00.000000 + +""" + +from sqlalchemy import ( + Column, + DateTime, + Integer, + String, + Text, + UniqueConstraint, +) + +from superset.migrations.shared.utils import ( + create_fks_for_table, + create_index, + create_table, + drop_fks_for_table, + drop_index, + drop_table, +) + +# revision identifiers, used by Alembic. +revision = "4b2a8c9d3e1f" +down_revision = "9787190b3d89" + +TASKS_TABLE = "tasks" +TASK_SUBSCRIBERS_TABLE = "task_subscribers" + + +def upgrade(): + """ + Create tasks and task_subscribers tables for the Global Task Framework (GTF). + + This migration creates: + 1. tasks table - unified tracking for all long running tasks + 2. task_subscribers table - multi-user task subscriptions for shared tasks + + The scope feature allows tasks to be: + - private: user-specific (default) + - shared: multi-user collaborative tasks + - system: admin-only background tasks + """ + # Create tasks table + create_table( + TASKS_TABLE, + Column("id", Integer, primary_key=True), + Column("uuid", String(36), nullable=False, unique=True), + Column("task_key", String(256), nullable=False), # For deduplication + Column("task_type", String(100), nullable=False), # e.g., 'sql_execution' + Column("task_name", String(256), nullable=True), # Human readable name + Column( + "scope", String(20), nullable=False, server_default="private" + ), # private/shared/system + Column("status", String(50), nullable=False), # PENDING, IN_PROGRESS, etc. + Column("dedup_key", String(512), nullable=False), # Computed deduplication key Review Comment: **Suggestion:** Compatibility/runtime error risk on MySQL/InnoDB with utf8mb4: `dedup_key` is defined as `String(512)` and then indexed uniquely; large indexed varchar columns can exceed MySQL index size limits and make the migration fail on common MySQL setups — reduce the column length (e.g. to 191) or use a hashed/shorter column for indexes to ensure the unique index can be created on all supported MySQL configurations. [possible bug] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Migration fails on MySQL utf8mb4 installs. - ⚠️ GTF unavailable until migration fixed. ``` </details> ```suggestion Column("dedup_key", String(191), nullable=False), # Computed deduplication key (shortened for indexed column compatibility) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Migration defines dedup_key as Column(\"dedup_key\", String(512), nullable=False) at line 76 and then creates a unique index on dedup_key at line 91 (`create_index(... "idx_tasks_dedup_key" ...)`). 2. Run Alembic upgrade against a MySQL/MariaDB server with default utf8mb4 settings (common production configuration). 3. When op.create_index() (called via shared/utils.create_index at line 416 in that module) issues the CREATE INDEX, MySQL may reject it because a 512-char utf8mb4 varchar exceeds InnoDB prefix/index byte limits (index length > 767/1000 bytes), causing the migration to fail with a MySQL error (index key too long). 4. Observing this requires applying the migration on a MySQL instance configured with utf8mb4 and default InnoDB limits; failure is deterministic in that environment. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/migrations/versions/2025_12_18_0220_create_tasks_table.py **Line:** 76:76 **Comment:** *Possible Bug: Compatibility/runtime error risk on MySQL/InnoDB with utf8mb4: `dedup_key` is defined as `String(512)` and then indexed uniquely; large indexed varchar columns can exceed MySQL index size limits and make the migration fail on common MySQL setups — reduce the column length (e.g. to 191) or use a hashed/shorter column for indexes to ensure the unique index can be created on all supported MySQL configurations. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/migrations/versions/2025_12_18_0220_create_tasks_table.py: ########## @@ -0,0 +1,217 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Create tasks and task_subscriber tables for Global Task Framework (GTF) + +Revision ID: 4b2a8c9d3e1f +Revises: 9787190b3d89 +Create Date: 2025-12-18 02:20:00.000000 + +""" + +from sqlalchemy import ( + Column, + DateTime, + Integer, + String, + Text, + UniqueConstraint, +) + +from superset.migrations.shared.utils import ( + create_fks_for_table, + create_index, + create_table, + drop_fks_for_table, + drop_index, + drop_table, +) + +# revision identifiers, used by Alembic. +revision = "4b2a8c9d3e1f" +down_revision = "9787190b3d89" + +TASKS_TABLE = "tasks" +TASK_SUBSCRIBERS_TABLE = "task_subscribers" + + +def upgrade(): + """ + Create tasks and task_subscribers tables for the Global Task Framework (GTF). + + This migration creates: + 1. tasks table - unified tracking for all long running tasks + 2. task_subscribers table - multi-user task subscriptions for shared tasks + + The scope feature allows tasks to be: + - private: user-specific (default) + - shared: multi-user collaborative tasks + - system: admin-only background tasks + """ + # Create tasks table + create_table( + TASKS_TABLE, + Column("id", Integer, primary_key=True), + Column("uuid", String(36), nullable=False, unique=True), + Column("task_key", String(256), nullable=False), # For deduplication + Column("task_type", String(100), nullable=False), # e.g., 'sql_execution' + Column("task_name", String(256), nullable=True), # Human readable name + Column( + "scope", String(20), nullable=False, server_default="private" + ), # private/shared/system + Column("status", String(50), nullable=False), # PENDING, IN_PROGRESS, etc. + Column("dedup_key", String(512), nullable=False), # Computed deduplication key + # AuditMixinNullable columns + Column("created_on", DateTime, nullable=True), + Column("changed_on", DateTime, nullable=True), + Column("created_by_fk", Integer, nullable=True), + Column("changed_by_fk", Integer, nullable=True), + # Task-specific columns + Column("started_at", DateTime, nullable=True), + Column("ended_at", DateTime, nullable=True), + Column("user_id", Integer, nullable=True), # User context for execution + Column("payload", Text, nullable=True), # JSON serialized task-specific data + Column("properties", Text, nullable=True), # JSON serialized properties + ) + + # Create indexes for optimal query performance + create_index(TASKS_TABLE, "idx_tasks_dedup_key", ["dedup_key"], unique=True) Review Comment: **Suggestion:** Logic bug: deduplication is scope-aware but the migration creates a globally-unique index on `dedup_key`, preventing the same dedup key from being used in different `scope` values; change the index to include `scope` so uniqueness is enforced per-scope (or make the index non-unique if global uniqueness is intended). [logic error] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ GTF task creation blocked across scopes. - ⚠️ Deduplication semantics differ from design. ``` </details> ```suggestion create_index(TASKS_TABLE, "idx_tasks_dedup_key", ["scope", "dedup_key"], unique=True) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Apply the migration by running Alembic upgrade which executes upgrade() in superset/migrations/versions/2025_12_18_0220_create_tasks_table.py (upgrade() starts at line 51) and performs index creation at line 91 (`create_index(... "idx_tasks_dedup_key" ...)`). 2. After migration, insert a task row into the tasks table with dedup_key='K' and scope='private' (the tasks table/columns are created at lines 65-88; dedup_key column defined at line 76). 3. Attempt to insert a second task row with the same dedup_key='K' but scope='shared'. 4. The DB will reject the second insert with a unique constraint/IntegrityError because the unique index created at line 91 enforces uniqueness only on dedup_key across all scopes (expected behavior: allow same dedup_key in different scopes), reproducing the logic bug. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/migrations/versions/2025_12_18_0220_create_tasks_table.py **Line:** 91:91 **Comment:** *Logic Error: Logic bug: deduplication is scope-aware but the migration creates a globally-unique index on `dedup_key`, preventing the same dedup key from being used in different `scope` values; change the index to include `scope` so uniqueness is enforced per-scope (or make the index non-unique if global uniqueness is intended). Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/commands/tasks/cancel.py: ########## @@ -0,0 +1,301 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Unified cancel task command for GTF.""" + +import logging +from functools import partial +from typing import TYPE_CHECKING + +from superset_core.api.tasks import TaskScope, TaskStatus + +from superset.commands.base import BaseCommand +from superset.commands.tasks.exceptions import ( + TaskAbortFailedError, + TaskNotAbortableError, + TaskNotFoundError, + TaskPermissionDeniedError, +) +from superset.extensions import security_manager +from superset.tasks.locks import task_lock +from superset.tasks.utils import get_active_dedup_key +from superset.utils.core import get_user_id +from superset.utils.decorators import on_error, transaction + +if TYPE_CHECKING: + from superset.models.tasks import Task + +logger = logging.getLogger(__name__) + + +class CancelTaskCommand(BaseCommand): + """ + Unified command to cancel a task. + + Behavior: + - For private tasks or single-subscriber tasks: aborts the task + - For shared tasks with multiple subscribers (non-admin): unsubscribes user + - For shared tasks with force=True (admin only): aborts for all subscribers + + The term "cancel" is user-facing; internally this may abort or unsubscribe. + + This command acquires a distributed lock before starting a transaction to + prevent race conditions with concurrent submit/cancel operations. + + Permission checks are deferred to inside the lock to minimize SELECTs: + we only fetch the task once, then validate permissions on the fetched data. + """ + + def __init__(self, task_uuid: str, force: bool = False): + """ + Initialize the cancel command. + + :param task_uuid: UUID of the task to cancel + :param force: If True, force abort even with multiple subscribers (admin only) + """ + self._task_uuid = task_uuid + self._force = force + self._action_taken: str = ( + "cancelled" # Will be set to 'aborted' or 'unsubscribed' + ) + self._should_publish_abort: bool = False + + def run(self) -> "Task": + """ + Execute the cancel command with distributed locking. + + The lock is acquired BEFORE starting the transaction to avoid holding + a DB connection during lock acquisition. Uses dedup_key as lock key + to ensure Submit and Cancel operations use the same lock. + + :returns: The updated task model + """ + from superset.daos.tasks import TaskDAO + + # Lightweight fetch to compute dedup_key for locking + # This is needed to use the same lock key as SubmitTaskCommand + task = TaskDAO.find_one_or_none( + skip_base_filter=security_manager.is_admin(), uuid=self._task_uuid + ) + + if not task: + raise TaskNotFoundError() + + # Compute dedup_key using the same logic as SubmitTaskCommand + dedup_key = get_active_dedup_key( + scope=task.scope, + task_type=task.task_type, + task_key=task.task_key, + user_id=task.user_id if task.is_private else None, + ) + + # Acquire lock BEFORE transaction starts + # Using dedup_key ensures Submit and Cancel use the same lock + with task_lock(dedup_key): + result = self._execute_with_transaction() + + # Publish abort notification AFTER transaction commits + # This prevents race conditions where listeners check DB before commit + if self._should_publish_abort: + from superset.tasks.manager import TaskManager + + TaskManager.publish_abort(self._task_uuid) + + return result + + @transaction(on_error=partial(on_error, reraise=TaskAbortFailedError)) + def _execute_with_transaction(self) -> "Task": + """ + Execute the cancel operation inside a transaction. + + Combines fetch + validation + execution in a single transaction, + reducing the number of SELECTs from 3 to 1 (plus DAO operations). + + :returns: The updated task model + """ + from superset.daos.tasks import TaskDAO + + # Check admin status (no DB access) + is_admin = security_manager.is_admin() + + # Force flag requires admin + if self._force and not is_admin: + raise TaskPermissionDeniedError( + "Only administrators can force cancel a task" + ) + + # Single SELECT: fetch task and validate permissions on it + task = TaskDAO.find_one_or_none(skip_base_filter=is_admin, uuid=self._task_uuid) + + if not task: + raise TaskNotFoundError() + + # Validate permissions on the fetched task + self._validate_permissions(task, is_admin) + + # Execute cancel and return updated task + return self._do_cancel(task, is_admin) + + def _validate_permissions(self, task: "Task", is_admin: bool) -> None: + """ + Validate permissions on an already-fetched task. + + Permission rules by scope: + - private: Only creator or admin (already filtered by base_filter) + - shared: Subscribers or admin + - system: Only admin + + :param task: The task to validate permissions for + :param is_admin: Whether current user is admin + :raises TaskAbortFailedError: If task is not in cancellable state + :raises TaskPermissionDeniedError: If user lacks permission + """ + # Check if task is in a cancellable state + if task.status not in [ + TaskStatus.PENDING.value, + TaskStatus.IN_PROGRESS.value, + TaskStatus.ABORTING.value, # Already aborting is OK (idempotent) + ]: + raise TaskAbortFailedError() + + # Admin can cancel anything + if is_admin: + return + + # Non-admin permission checks by scope + user_id = get_user_id() + + if task.scope == TaskScope.SYSTEM.value: + # System tasks are admin-only + raise TaskPermissionDeniedError( + "Only administrators can cancel system tasks" + ) + + if task.is_shared: + # Shared tasks: must be a subscriber + if not user_id or not task.has_subscriber(user_id): + raise TaskPermissionDeniedError( + "You must be subscribed to cancel this shared task" + ) + + # Private tasks: already filtered by base_filter (only creator can see) + # If we got here, user has permission + + def _do_cancel(self, task: "Task", is_admin: bool) -> "Task": + """ + Execute the cancel operation (abort or unsubscribe). + + :param task: The task to cancel + :param is_admin: Whether current user is admin + :returns: The updated task model + """ + user_id = get_user_id() + + # Determine action based on task scope and force flag + should_abort = ( + # Admin with force flag always aborts + (is_admin and self._force) + # Private tasks always abort (only one user) + or task.is_private + # System tasks always abort (admin only anyway) + or task.is_system + # Single or last subscriber - abort + or task.subscriber_count <= 1 + ) + + if should_abort: + return self._do_abort(task, is_admin) + else: + return self._do_unsubscribe(task, user_id) + + def _do_abort(self, task: "Task", is_admin: bool) -> "Task": + """ + Execute abort operation. + + :param task: The task to abort + :param is_admin: Whether current user is admin + :returns: The updated task model + """ + from superset.daos.tasks import TaskDAO + + try: + result = TaskDAO.abort_task(task.uuid, skip_base_filter=is_admin) + except TaskNotAbortableError: + raise + + if result is None: + # abort_task returned None - task wasn't aborted + # This can happen if task is already finished + raise TaskAbortFailedError() + + self._action_taken = "aborted" + + # Track if we need to publish abort after commit + if result.status == TaskStatus.ABORTING.value: Review Comment: **Suggestion:** Inconsistent TaskStatus comparison: the code checks `result.status == TaskStatus.ABORTING.value`, but `result.status` may be stored as either an enum or a string; this comparison can silently fail to detect an aborting state. Normalize the status to the enum before comparing (e.g., `TaskStatus(result.status) == TaskStatus.ABORTING`). [type error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ Abort notifications not published post-abort. - ⚠️ TaskManager.publish_abort callers receive no signal. - ⚠️ Subscribers/listeners miss abort events. ``` </details> ```suggestion if TaskStatus(result.status) == TaskStatus.ABORTING: ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Trigger CancelTaskCommand.run() for a task that will reach `_do_abort()` at `superset/commands/tasks/cancel.py:224` (this path is reached when `should_abort` is True in `_do_cancel()` at `superset/commands/tasks/cancel.py:207-222`). 2. `_do_abort()` calls `TaskDAO.abort_task(...)` at `superset/commands/tasks/cancel.py:235` and assigns the returned model to `result`. 3. If the `result.status` attribute is a TaskStatus enum instance (not a string), the equality check at `superset/commands/tasks/cancel.py:247` (`result.status == TaskStatus.ABORTING.value`) will be False, so `self._should_publish_abort` remains False. 4. After the transaction commits, `run()` checks `self._should_publish_abort` at `superset/commands/tasks/cancel.py:112-116` and—because it is False—skips `TaskManager.publish_abort(self._task_uuid)`. The abort notification is not published, so any abort listeners relying on publish/subscribe will not be notified. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/commands/tasks/cancel.py **Line:** 247:247 **Comment:** *Type Error: Inconsistent TaskStatus comparison: the code checks `result.status == TaskStatus.ABORTING.value`, but `result.status` may be stored as either an enum or a string; this comparison can silently fail to detect an aborting state. Normalize the status to the enum before comparing (e.g., `TaskStatus(result.status) == TaskStatus.ABORTING`). Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
