codeant-ai-for-open-source[bot] commented on code in PR #36368:
URL: https://github.com/apache/superset/pull/36368#discussion_r2748001408


##########
superset/commands/tasks/prune.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
+
+import sqlalchemy as sa
+from superset_core.api.tasks import TaskStatus
+
+from superset import db
+from superset.commands.base import BaseCommand
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class TaskPruneCommand(BaseCommand):
+    """
+    Command to prune the tasks table by deleting rows older than the specified
+    retention period.
+
+    This command deletes records from the `Task` table that are in terminal 
states
+    (success, failure, aborted, or timed_out) and have not been changed within 
the
+    specified number of days. It helps in maintaining the database by removing
+    outdated entries and freeing up space.
+
+    Attributes:
+        retention_period_days (int): The number of days for which records 
should be retained.
+                                     Records older than this period will be 
deleted.
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a single run.
+                                       If provided and greater than zero, rows 
are selected
+                                       deterministically from the oldest first 
(by timestamp then id)
+                                       up to this limit in this execution.
+    """  # noqa: E501
+
+    def __init__(self, retention_period_days: int, max_rows_per_run: int | 
None = None):
+        """
+        :param retention_period_days: Number of days to keep in the tasks table
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from the
+            oldest first (by timestamp then id) up to this limit in this 
execution.
+        """  # noqa: E501
+        self.retention_period_days = retention_period_days
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs that need to be deleted
+        # Only delete completed tasks (success, failure, or aborted)
+        from superset.models.tasks import Task
+
+        select_stmt = sa.select(Task.id).where(
+            Task.ended_at < datetime.now() - 
timedelta(days=self.retention_period_days),

Review Comment:
   **Suggestion:** Potential timezone bug: the code uses naive `datetime.now()` 
to build the cutoff and compares it to `Task.ended_at`. If `Task.ended_at` is 
timezone-aware this can cause incorrect comparisons or DB errors; compute a 
timezone-aware threshold (e.g. `datetime.now(timezone.utc) - ...`) to match 
timezone-aware columns. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Incorrect pruning cutoff retains old rows.
   - ⚠️ Scheduled pruning miscalculates retention threshold.
   ```
   </details>
   
   ```suggestion
           from datetime import timezone
               threshold = datetime.now(timezone.utc) - 
timedelta(days=self.retention_period_days)
               select_stmt = sa.select(Task.id).where(
                   Task.ended_at < threshold,
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Create a Task row whose `ended_at` column is timezone-aware (e.g. stored 
as UTC).
   
      The Task model is imported at superset/commands/tasks/prune.py:74.
   
   2. Instantiate and run the prune command:
   
      cmd = TaskPruneCommand(retention_period_days=30)
   
      cmd.run() — run() executes the select at 
superset/commands/tasks/prune.py:76-86.
   
   3. The code constructs a naive cutoff with datetime.now() at line 77 and 
compares it to
   Task.ended_at.
   
   4. If Task.ended_at is timezone-aware (database TIMESTAMP WITH TIME ZONE), 
the naive vs
   aware comparison
   
      can lead to incorrect cutoff semantics or unexpected DB-level 
interpretation, causing
      incorrect rows to be selected.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/commands/tasks/prune.py
   **Line:** 76:77
   **Comment:**
        *Logic Error: Potential timezone bug: the code uses naive 
`datetime.now()` to build the cutoff and compares it to `Task.ended_at`. If 
`Task.ended_at` is timezone-aware this can cause incorrect comparisons or DB 
errors; compute a timezone-aware threshold (e.g. `datetime.now(timezone.utc) - 
...`) to match timezone-aware columns.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/daos/tasks.py:
##########
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task DAO for Global Task Framework (GTF)"""
+
+import logging
+from datetime import datetime, timezone
+from typing import Any
+
+from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus
+
+from superset.daos.base import BaseDAO
+from superset.daos.exceptions import DAODeleteFailedError
+from superset.extensions import db
+from superset.models.task_subscribers import TaskSubscriber
+from superset.models.tasks import Task
+from superset.tasks.filters import TaskFilter
+from superset.tasks.utils import get_active_dedup_key
+from superset.utils.decorators import transaction
+
+logger = logging.getLogger(__name__)
+
+
+class TaskDAO(BaseDAO[Task]):
+    """
+    Concrete TaskDAO for the Global Task Framework (GTF).
+
+    Provides database access operations for async tasks including
+    creation, status management, filtering, and subscription management
+    for shared tasks.
+    """
+
+    base_filter = TaskFilter
+
+    @classmethod
+    def find_by_task_key(
+        cls,
+        task_type: str,
+        task_key: str,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+    ) -> Task | None:
+        """
+        Find active task by type, key, scope, and user.
+
+        Uses dedup_key internally for efficient querying with a unique index.
+        Only returns tasks that are active (pending or in progress).
+
+        Uniqueness logic by scope:
+        - private: scope + task_type + task_key + user_id
+        - shared/system: scope + task_type + task_key (user-agnostic)
+
+        :param task_type: Task type to filter by
+        :param task_key: Task identifier for deduplication
+        :param scope: Task scope (private/shared/system)
+        :param user_id: User ID (required for private tasks)
+        :returns: Task instance or None if not found or not active
+        """
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=user_id,
+        )
+
+        # Simple single-column query with unique index
+        return db.session.query(Task).filter(Task.dedup_key == 
dedup_key).one_or_none()

Review Comment:
   **Suggestion:** Logic bug: `find_by_task_key` claims to return "active" 
tasks (pending or in-progress) but the query only filters by `dedup_key` and 
does not restrict by `status`, so it may return finished tasks that happen to 
share the same dedup key format; add an explicit filter for active statuses. 
[logic error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Task deduplication/joining may return finished tasks.
   - ⚠️ SubmitTaskCommand create-vs-join logic affected (referenced in docs).
   ```
   </details>
   
   ```suggestion
           # Simple single-column query with unique index, restrict to active 
statuses
           return (
               db.session.query(Task)
               .filter(
                   Task.dedup_key == dedup_key,
                   Task.status.in_([TaskStatus.PENDING.value, 
TaskStatus.IN_PROGRESS.value]),
               )
               .one_or_none()
           )
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Create a test or run a Python REPL that imports TaskDAO from 
superset/daos/tasks.py and
   Task from superset/models/tasks.py.
   
   2. Insert a Task row manually via ORM in the DB with dedup_key equal to the 
active format
   (e.g., call TaskDAO.create_task(...) at superset/daos/tasks.py:84 to create 
a task, then
   set its status to a finished value like TaskStatus.SUCCESS.value and 
(intentionally) leave
   dedup_key unchanged).
   
   3. Call TaskDAO.find_by_task_key(task_type, task_key, scope, user_id) 
(defined at
   superset/daos/tasks.py:48-55, query at lines 79-80).
   
   4. Observe that find_by_task_key returns the record even though its status 
is finished,
   because the current code (superset/daos/tasks.py:79-80) only filters by 
dedup_key and not
   by status. This demonstrates the reported logic gap.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/daos/tasks.py
   **Line:** 79:80
   **Comment:**
        *Logic Error: Logic bug: `find_by_task_key` claims to return "active" 
tasks (pending or in-progress) but the query only filters by `dedup_key` and 
does not restrict by `status`, so it may return finished tasks that happen to 
share the same dedup key format; add an explicit filter for active statuses.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/commands/tasks/prune.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
+
+import sqlalchemy as sa
+from superset_core.api.tasks import TaskStatus
+
+from superset import db
+from superset.commands.base import BaseCommand
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class TaskPruneCommand(BaseCommand):
+    """
+    Command to prune the tasks table by deleting rows older than the specified
+    retention period.
+
+    This command deletes records from the `Task` table that are in terminal 
states
+    (success, failure, aborted, or timed_out) and have not been changed within 
the
+    specified number of days. It helps in maintaining the database by removing
+    outdated entries and freeing up space.
+
+    Attributes:
+        retention_period_days (int): The number of days for which records 
should be retained.
+                                     Records older than this period will be 
deleted.
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a single run.
+                                       If provided and greater than zero, rows 
are selected
+                                       deterministically from the oldest first 
(by timestamp then id)
+                                       up to this limit in this execution.
+    """  # noqa: E501
+
+    def __init__(self, retention_period_days: int, max_rows_per_run: int | 
None = None):
+        """
+        :param retention_period_days: Number of days to keep in the tasks table
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from the
+            oldest first (by timestamp then id) up to this limit in this 
execution.
+        """  # noqa: E501
+        self.retention_period_days = retention_period_days
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs that need to be deleted
+        # Only delete completed tasks (success, failure, or aborted)
+        from superset.models.tasks import Task
+
+        select_stmt = sa.select(Task.id).where(
+            Task.ended_at < datetime.now() - 
timedelta(days=self.retention_period_days),
+            Task.status.in_(
+                [
+                    TaskStatus.SUCCESS.value,
+                    TaskStatus.FAILURE.value,
+                    TaskStatus.ABORTED.value,
+                    TaskStatus.TIMED_OUT.value,
+                ]
+            ),
+        )
+
+        # Optionally limited by max_rows_per_run
+        # order by oldest first for deterministic deletion
+        if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
+            select_stmt = select_stmt.order_by(
+                Task.ended_at.asc(), Task.id.asc()
+            ).limit(self.max_rows_per_run)
+
+        ids_to_delete = db.session.execute(select_stmt).scalars().all()
+
+        total_rows = len(ids_to_delete)
+
+        logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
+
+        next_logging_threshold = 1
+
+        # Iterate over the IDs in batches
+        for i in range(0, total_rows, batch_size):
+            batch_ids = ids_to_delete[i : i + batch_size]
+
+            # Delete the selected batch using IN clause
+            result = 
db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids)))
+
+            # Update the total number of deleted records
+            total_deleted += result.rowcount
+
+            # Explicitly commit the transaction given that if an error occurs, 
we want to ensure that the  # noqa: E501
+            # records that have been deleted so far are committed
+            db.session.commit()
+
+            # Log the number of deleted records every 1% increase in progress
+            percentage_complete = (total_deleted / total_rows) * 100
+            if percentage_complete >= next_logging_threshold:
+                logger.info(
+                    "Deleted %s rows from the tasks table older than %s days 
(%d%% complete)",  # noqa: E501
+                    f"{total_deleted:,}",
+                    self.retention_period_days,
+                    percentage_complete,
+                )
+                next_logging_threshold += 1
+
+        elapsed_time = time.time() - start_time
+        minutes, seconds = divmod(elapsed_time, 60)
+        formatted_time = f"{int(minutes):02}:{int(seconds):02}"
+        logger.info(
+            "Pruning complete: %s rows deleted in %s",
+            f"{total_deleted:,}",
+            formatted_time,
+        )
+
+    def validate(self) -> None:
+        pass

Review Comment:
   **Suggestion:** Missing validation: `validate()` is a no-op and 
`retention_period_days` (or `max_rows_per_run`) could be set to invalid values 
(negative or non-integer), which can cause unintended mass deletions; implement 
checks to ensure `retention_period_days` is a non-negative integer and 
`max_rows_per_run` is either None or a positive integer. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Misconfiguration can delete almost all task rows.
   - ⚠️ Admin-scheduled prune job becomes destructive.
   ```
   </details>
   
   ```suggestion
           # retention_period_days must be a non-negative integer
           if not isinstance(self.retention_period_days, int) or 
self.retention_period_days < 0:
               raise ValueError("retention_period_days must be a non-negative 
integer")
           # max_rows_per_run must be None or a positive integer
           if self.max_rows_per_run is not None and (
               not isinstance(self.max_rows_per_run, int) or 
self.max_rows_per_run <= 0
           ):
               raise ValueError("max_rows_per_run must be a positive integer or 
None")
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Instantiate the command with an invalid retention, e.g.
   TaskPruneCommand(retention_period_days=-1)
   
      (constructor at superset/commands/tasks/prune.py:54-63).
   
   2. Call cmd.run() (method defined at superset/commands/tasks/prune.py:64). 
The select
   cutoff is computed at
   
      superset/commands/tasks/prune.py:76 as datetime.now() -
      timedelta(days=self.retention_period_days).
   
   3. With retention_period_days = -1 the cutoff becomes datetime.now() + 1 
day, so the WHERE
   clause at lines 76-86
   
      matches almost all rows (Task.ended_at < tomorrow).
   
   4. The loop at lines 104-126 will therefore delete a far larger set of Task 
rows than
   intended (potentially all),
   
      demonstrating the missing validation leads to dangerous mass deletions.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/commands/tasks/prune.py
   **Line:** 138:138
   **Comment:**
        *Logic Error: Missing validation: `validate()` is a no-op and 
`retention_period_days` (or `max_rows_per_run`) could be set to invalid values 
(negative or non-integer), which can cause unintended mass deletions; implement 
checks to ensure `retention_period_days` is a non-negative integer and 
`max_rows_per_run` is either None or a positive integer.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/daos/test_tasks.py:
##########
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.  You may obtain
+# a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+from superset_core.api.tasks import TaskStatus
+
+from superset.daos.tasks import TaskDAO
+from superset.models.tasks import Task
+
+
+class TestTaskDAO:
+    """Test TaskDAO functionality"""
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_active(self, mock_session, mock_get_user_id):
+        """Test finding active task by task_key"""
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.task_key = "test-id"
+        mock_task.task_type = "test_type"
+        mock_task.status = TaskStatus.PENDING.value
+
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = mock_task
+
+        result = TaskDAO.find_by_task_key("test_type", "test-key")
+
+        assert result == mock_task
+        mock_session.query.assert_called_once_with(Task)
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_not_found(self, mock_session, mock_get_user_id):
+        """Test finding task by task_key returns None when not found"""
+        mock_get_user_id.return_value = 1
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = None
+
+        result = TaskDAO.find_by_task_key("test_type", "nonexistent-key")
+
+        assert result is None
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_ignores_finished_tasks(
+        self, mock_session, mock_get_user_id
+    ):
+        """Test that find_by_task_key only returns pending/in-progress tasks"""
+        mock_get_user_id.return_value = 1
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = None
+
+        # Should not find SUCCESS task
+        result = TaskDAO.find_by_task_key("test_type", "finished-key")
+        assert result is None
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_success(self, mock_session, mock_create, 
mock_get_user_id):
+        """Test successful task creation.
+
+        TaskDAO.create_task is now a pure data operation - it assumes the 
caller
+        (SubmitTaskCommand) has already checked for duplicates and holds the 
lock.
+        """
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+        )
+
+        assert result == mock_task
+        mock_create.assert_called_once()
+        mock_session.commit.assert_called_once()
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_with_user_id(
+        self, mock_session, mock_create, mock_get_user_id
+    ):
+        """Test task creation with explicit user_id.
+
+        TaskDAO.create_task should use the provided user_id for the task.
+        """
+        mock_get_user_id.return_value = 99  # Different from provided user_id
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+            user_id=42,  # Explicit user_id
+        )
+
+        assert result == mock_task
+        # Verify create was called with user_id in attributes
+        call_args = mock_create.call_args
+        assert call_args[1]["attributes"]["user_id"] == 42
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_with_properties(
+        self, mock_session, mock_create, mock_get_user_id
+    ):
+        """Test task creation with properties.
+
+        Properties are set via update_properties() after task creation.
+        """
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+            properties={"timeout": 300},
+        )
+
+        assert result == mock_task
+        mock_task.update_properties.assert_called_once_with({"timeout": 300})
+
+    @patch("superset.daos.tasks.TaskDAO.find_one_or_none")
+    @patch("superset.daos.tasks.db.session")
+    def test_abort_task_pending_success(self, mock_session, mock_find):
+        """Test successful abort of pending task - goes directly to ABORTED"""
+        mock_task = MagicMock(spec=Task)
+        mock_task.status = TaskStatus.PENDING.value
+        mock_task.is_shared = False
+        mock_task.subscriber_count = 0
+        mock_find.return_value = mock_task
+
+        result = TaskDAO.abort_task("test-uuid")
+
+        # Now returns Task instead of bool
+        assert result is mock_task
+        mock_task.set_status.assert_called_once_with(TaskStatus.ABORTED)
+        mock_session.commit.assert_called_once()
+
+    @patch("superset.daos.tasks.TaskDAO.find_one_or_none")
+    @patch("superset.daos.tasks.db.session")
+    def test_abort_task_in_progress_abortable(self, mock_session, mock_find):
+        """Test abort of in-progress task with abort handler.
+
+        Should transition to ABORTING status.
+        """
+        mock_task = MagicMock(spec=Task)
+        mock_task.status = TaskStatus.IN_PROGRESS.value
+        mock_task.properties = {"is_abortable": True}  # Dict, not MagicMock
+        mock_task.is_shared = False
+        mock_task.subscriber_count = 0
+        mock_find.return_value = mock_task
+
+        result = TaskDAO.abort_task("test-uuid")
+
+        # Now returns Task instead of bool
+        assert result is mock_task
+        # Should set status to ABORTING, not ABORTED
+        assert mock_task.status == TaskStatus.ABORTING.value
+        mock_session.merge.assert_called_once_with(mock_task)
+        mock_session.commit.assert_called_once()

Review Comment:
   **Suggestion:** The test for aborting an in-progress abortable task expects 
a commit after transitioning to ABORTING, but `TaskDAO.abort_task` only merges 
the task and returns without committing; change the commit assertion to 
`assert_not_called()` to match the implementation. [possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Unit test fails in tests/unit_tests/daos/test_tasks.py.
   - ⚠️ CI run will be blocked by this failing test.
   ```
   </details>
   
   ```suggestion
           mock_session.commit.assert_not_called()
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Run the test: pytest
   
tests/unit_tests/daos/test_tasks.py::TestTaskDAO::test_abort_task_in_progress_abortable.
   The test invokes TaskDAO.abort_task at 
tests/unit_tests/daos/test_tasks.py:193 and asserts
   commit at line 200.
   
   2. The abort path under test is IN_PROGRESS and abortable; execution goes to
   superset/daos/tasks.py:171-233 (abort_task). There the code sets task.status 
=
   TaskStatus.ABORTING.value and calls db.session.merge(task), then returns — 
it does not
   call db.session.commit() in that code path.
   
   3. Because the test patches superset.daos.tasks.db.session, 
mock_session.merge will be
   called (tests assert that), but mock_session.commit will not. The test's 
assertion at
   tests/unit_tests/daos/test_tasks.py:200 expecting commit therefore fails.
   
   4. Change the test to assert commit was not called (or intentionally add a 
commit in the
   DAO) to align test expectations with the implemented abort_task behavior.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/daos/test_tasks.py
   **Line:** 200:200
   **Comment:**
        *Possible Bug: The test for aborting an in-progress abortable task 
expects a commit after transitioning to ABORTING, but `TaskDAO.abort_task` only 
merges the task and returns without committing; change the commit assertion to 
`assert_not_called()` to match the implementation.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/daos/tasks.py:
##########
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task DAO for Global Task Framework (GTF)"""
+
+import logging
+from datetime import datetime, timezone
+from typing import Any
+
+from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus
+
+from superset.daos.base import BaseDAO
+from superset.daos.exceptions import DAODeleteFailedError
+from superset.extensions import db
+from superset.models.task_subscribers import TaskSubscriber
+from superset.models.tasks import Task
+from superset.tasks.filters import TaskFilter
+from superset.tasks.utils import get_active_dedup_key
+from superset.utils.decorators import transaction
+
+logger = logging.getLogger(__name__)
+
+
+class TaskDAO(BaseDAO[Task]):
+    """
+    Concrete TaskDAO for the Global Task Framework (GTF).
+
+    Provides database access operations for async tasks including
+    creation, status management, filtering, and subscription management
+    for shared tasks.
+    """
+
+    base_filter = TaskFilter
+
+    @classmethod
+    def find_by_task_key(
+        cls,
+        task_type: str,
+        task_key: str,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+    ) -> Task | None:
+        """
+        Find active task by type, key, scope, and user.
+
+        Uses dedup_key internally for efficient querying with a unique index.
+        Only returns tasks that are active (pending or in progress).
+
+        Uniqueness logic by scope:
+        - private: scope + task_type + task_key + user_id
+        - shared/system: scope + task_type + task_key (user-agnostic)
+
+        :param task_type: Task type to filter by
+        :param task_key: Task identifier for deduplication
+        :param scope: Task scope (private/shared/system)
+        :param user_id: User ID (required for private tasks)
+        :returns: Task instance or None if not found or not active
+        """
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=user_id,
+        )
+
+        # Simple single-column query with unique index
+        return db.session.query(Task).filter(Task.dedup_key == 
dedup_key).one_or_none()
+
+    @classmethod
+    @transaction()
+    def create_task(
+        cls,
+        task_type: str,
+        task_key: str,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+        payload: dict[str, Any] | None = None,
+        properties: TaskProperties | None = None,
+        **kwargs: Any,
+    ) -> Task:
+        """
+        Create a new task record in the database.
+
+        This is a pure data operation - assumes caller holds lock and has
+        already checked for existing tasks. Business logic (create vs join)
+        is handled by SubmitTaskCommand.
+
+        :param task_type: Type of task to create
+        :param task_key: Task identifier (required)
+        :param scope: Task scope (private/shared/system), defaults to private
+        :param user_id: User ID creating the task
+        :param payload: Optional user-defined context data (dict)
+        :param properties: Optional framework-managed runtime state (e.g., 
timeout)
+        :param kwargs: Additional task attributes (e.g., task_name)
+        :returns: Created Task instance
+        """
+        # Build dedup_key for active task
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=user_id,
+        )
+
+        # Handle both TaskScope enum and string values
+        scope_value = scope.value if isinstance(scope, TaskScope) else scope
+
+        # Note: properties is handled separately via update_properties()
+        # because it's a hybrid property with only a getter
+        task_data = {
+            "task_type": task_type,
+            "task_key": task_key,
+            "scope": scope_value,
+            "status": TaskStatus.PENDING.value,
+            "dedup_key": dedup_key,
+            **kwargs,
+        }
+
+        # Handle payload - serialize to JSON if dict provided
+        if payload:

Review Comment:
   **Suggestion:** Bug with falsy payloads: the code uses `if payload:` which 
skips serializing valid but falsy payloads (e.g., empty dict or empty list). 
Use an explicit `is not None` check to preserve intentionally empty payloads. 
[logic error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Intentionally empty task payloads are not persisted.
   - ⚠️ Downstream task consumers miss expected context.
   ```
   </details>
   
   ```suggestion
           # Handle payload - serialize to JSON if dict provided (allow empty 
payloads)
           if payload is not None:
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. In a test or integration scenario call TaskDAO.create_task(...) at
   superset/daos/tasks.py:84 with payload={} (explicit empty dict).
   
   2. The create_task function execution reaches the payload handling block at
   superset/daos/tasks.py:132-136.
   
   3. With the current condition `if payload:` (superset/daos/tasks.py:133), an 
empty dict
   evaluates to False and the code path that sets task_data["payload"] is 
skipped.
   
   4. After insertion, inspect the Task row in the DB or task.payload attribute 
and observe
   the payload field is not set, demonstrating loss of intentionally provided 
empty payloads.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/daos/tasks.py
   **Line:** 132:133
   **Comment:**
        *Logic Error: Bug with falsy payloads: the code uses `if payload:` 
which skips serializing valid but falsy payloads (e.g., empty dict or empty 
list). Use an explicit `is not None` check to preserve intentionally empty 
payloads.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/daos/test_tasks.py:
##########
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.  You may obtain
+# a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+from superset_core.api.tasks import TaskStatus
+
+from superset.daos.tasks import TaskDAO
+from superset.models.tasks import Task
+
+
+class TestTaskDAO:
+    """Test TaskDAO functionality"""
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_active(self, mock_session, mock_get_user_id):
+        """Test finding active task by task_key"""
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.task_key = "test-id"
+        mock_task.task_type = "test_type"
+        mock_task.status = TaskStatus.PENDING.value
+
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = mock_task
+
+        result = TaskDAO.find_by_task_key("test_type", "test-key")
+
+        assert result == mock_task
+        mock_session.query.assert_called_once_with(Task)
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_not_found(self, mock_session, mock_get_user_id):
+        """Test finding task by task_key returns None when not found"""
+        mock_get_user_id.return_value = 1
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = None
+
+        result = TaskDAO.find_by_task_key("test_type", "nonexistent-key")
+
+        assert result is None
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_ignores_finished_tasks(
+        self, mock_session, mock_get_user_id
+    ):
+        """Test that find_by_task_key only returns pending/in-progress tasks"""
+        mock_get_user_id.return_value = 1
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = None
+
+        # Should not find SUCCESS task
+        result = TaskDAO.find_by_task_key("test_type", "finished-key")
+        assert result is None
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_success(self, mock_session, mock_create, 
mock_get_user_id):
+        """Test successful task creation.
+
+        TaskDAO.create_task is now a pure data operation - it assumes the 
caller
+        (SubmitTaskCommand) has already checked for duplicates and holds the 
lock.
+        """
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+        )
+
+        assert result == mock_task
+        mock_create.assert_called_once()
+        mock_session.commit.assert_called_once()
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_with_user_id(
+        self, mock_session, mock_create, mock_get_user_id
+    ):
+        """Test task creation with explicit user_id.
+
+        TaskDAO.create_task should use the provided user_id for the task.
+        """
+        mock_get_user_id.return_value = 99  # Different from provided user_id
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+            user_id=42,  # Explicit user_id
+        )
+
+        assert result == mock_task
+        # Verify create was called with user_id in attributes
+        call_args = mock_create.call_args
+        assert call_args[1]["attributes"]["user_id"] == 42
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_with_properties(
+        self, mock_session, mock_create, mock_get_user_id
+    ):
+        """Test task creation with properties.
+
+        Properties are set via update_properties() after task creation.
+        """
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+            properties={"timeout": 300},
+        )
+
+        assert result == mock_task
+        mock_task.update_properties.assert_called_once_with({"timeout": 300})
+
+    @patch("superset.daos.tasks.TaskDAO.find_one_or_none")
+    @patch("superset.daos.tasks.db.session")
+    def test_abort_task_pending_success(self, mock_session, mock_find):
+        """Test successful abort of pending task - goes directly to ABORTED"""
+        mock_task = MagicMock(spec=Task)
+        mock_task.status = TaskStatus.PENDING.value
+        mock_task.is_shared = False
+        mock_task.subscriber_count = 0
+        mock_find.return_value = mock_task
+
+        result = TaskDAO.abort_task("test-uuid")
+
+        # Now returns Task instead of bool
+        assert result is mock_task
+        mock_task.set_status.assert_called_once_with(TaskStatus.ABORTED)
+        mock_session.commit.assert_called_once()

Review Comment:
   **Suggestion:** The test for aborting a pending task asserts that 
`db.session.commit()` was called, but `TaskDAO.abort_task` for PENDING tasks 
only sets the status and returns without committing; change the assertion to 
assert the commit was not called. [possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Unit test fails in tests/unit_tests/daos/test_tasks.py.
   - ⚠️ CI run will be blocked by this failing test.
   ```
   </details>
   
   ```suggestion
           mock_session.commit.assert_not_called()
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Run the test: pytest
   
tests/unit_tests/daos/test_tasks.py::TestTaskDAO::test_abort_task_pending_success.
 The
   test calls TaskDAO.abort_task at tests/unit_tests/daos/test_tasks.py:172 and 
asserts
   commit at line 177.
   
   2. The test patches TaskDAO.find_one_or_none and db.session; 
TaskDAO.abort_task is
   implemented in superset/daos/tasks.py:171-233 (function abort_task).
   
   3. In abort_task (superset/daos/tasks.py:171-233) the PENDING branch sets
   task.set_status(TaskStatus.ABORTED) and returns the task, but there is no
   db.session.commit() call in that code path — only logging and status change.
   
   4. Therefore the test's expectation that mock_session.commit was called 
(tests line 177)
   is incorrect; asserting commit was not called (or adjusting the DAO to commit
   intentionally) reproduces and resolves the mismatch.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/daos/test_tasks.py
   **Line:** 177:177
   **Comment:**
        *Possible Bug: The test for aborting a pending task asserts that 
`db.session.commit()` was called, but `TaskDAO.abort_task` for PENDING tasks 
only sets the status and returns without committing; change the assertion to 
assert the commit was not called.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/daos/test_tasks.py:
##########
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.  You may obtain
+# a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+from superset_core.api.tasks import TaskStatus
+
+from superset.daos.tasks import TaskDAO
+from superset.models.tasks import Task
+
+
+class TestTaskDAO:
+    """Test TaskDAO functionality"""
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_active(self, mock_session, mock_get_user_id):
+        """Test finding active task by task_key"""
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.task_key = "test-id"
+        mock_task.task_type = "test_type"
+        mock_task.status = TaskStatus.PENDING.value
+
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = mock_task
+
+        result = TaskDAO.find_by_task_key("test_type", "test-key")
+
+        assert result == mock_task
+        mock_session.query.assert_called_once_with(Task)
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_not_found(self, mock_session, mock_get_user_id):
+        """Test finding task by task_key returns None when not found"""
+        mock_get_user_id.return_value = 1
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = None
+
+        result = TaskDAO.find_by_task_key("test_type", "nonexistent-key")
+
+        assert result is None
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.db.session")
+    def test_find_by_task_key_ignores_finished_tasks(
+        self, mock_session, mock_get_user_id
+    ):
+        """Test that find_by_task_key only returns pending/in-progress tasks"""
+        mock_get_user_id.return_value = 1
+        mock_query = MagicMock()
+        mock_session.query.return_value = mock_query
+        mock_filter = MagicMock()
+        mock_query.filter.return_value = mock_filter
+        mock_filter.one_or_none.return_value = None
+
+        # Should not find SUCCESS task
+        result = TaskDAO.find_by_task_key("test_type", "finished-key")
+        assert result is None
+
+    @patch("superset.utils.core.get_user_id")
+    @patch("superset.daos.tasks.TaskDAO.create")
+    @patch("superset.daos.tasks.db.session")
+    def test_create_task_success(self, mock_session, mock_create, 
mock_get_user_id):
+        """Test successful task creation.
+
+        TaskDAO.create_task is now a pure data operation - it assumes the 
caller
+        (SubmitTaskCommand) has already checked for duplicates and holds the 
lock.
+        """
+        mock_get_user_id.return_value = 1
+        mock_task = MagicMock(spec=Task)
+        mock_task.id = 1
+        mock_task.task_key = "new-task"
+        mock_task.task_type = "test_type"
+        mock_create.return_value = mock_task
+
+        result = TaskDAO.create_task(
+            task_type="test_type",
+            task_key="new-task",
+        )
+
+        assert result == mock_task
+        mock_create.assert_called_once()
+        mock_session.commit.assert_called_once()

Review Comment:
   **Suggestion:** The test incorrectly expects a DB transaction commit from 
`TaskDAO.create_task`, but the implementation only calls `db.session.flush()` 
(no commit). Replace the failing `commit` assertion with a check for `flush` to 
match the DAO behavior. [possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Unit test fails in tests/unit_tests/daos/test_tasks.py.
   - ⚠️ CI run will be blocked by this failing test.
   ```
   </details>
   
   ```suggestion
           mock_session.flush.assert_called_once()
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Run the single unit test: pytest
   tests/unit_tests/daos/test_tasks.py::TestTaskDAO::test_create_task_success. 
The test file
   line invoking the DAO is at tests/unit_tests/daos/test_tasks.py:99 (result =
   TaskDAO.create_task(...)) and the failing assertion is at line 106
   (mock_session.commit.assert_called_once()).
   
   2. The test patches the DB session via 
@patch("superset.daos.tasks.db.session"), so the
   create_task call in tests routes to superset/daos/tasks.py:create_task 
(function span
   superset/daos/tasks.py:84-167).
   
   3. Inspect superset/daos/tasks.py:create_task (lines 84-167) — the function 
calls
   db.session.flush() (to obtain the new task id) but does not call 
db.session.commit(). This
   is the implemented behavior.
   
   4. Because the test asserts commit at 
tests/unit_tests/daos/test_tasks.py:106 but the DAO
   only flushes, the assertion fails. Updating the test to assert flush (or 
remove the commit
   assertion) matches the actual implementation.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/daos/test_tasks.py
   **Line:** 106:106
   **Comment:**
        *Possible Bug: The test incorrectly expects a DB transaction commit 
from `TaskDAO.create_task`, but the implementation only calls 
`db.session.flush()` (no commit). Replace the failing `commit` assertion with a 
check for `flush` to match the DAO behavior.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/commands/tasks/prune.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
+
+import sqlalchemy as sa
+from superset_core.api.tasks import TaskStatus
+
+from superset import db
+from superset.commands.base import BaseCommand
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class TaskPruneCommand(BaseCommand):
+    """
+    Command to prune the tasks table by deleting rows older than the specified
+    retention period.
+
+    This command deletes records from the `Task` table that are in terminal 
states
+    (success, failure, aborted, or timed_out) and have not been changed within 
the
+    specified number of days. It helps in maintaining the database by removing
+    outdated entries and freeing up space.
+
+    Attributes:
+        retention_period_days (int): The number of days for which records 
should be retained.
+                                     Records older than this period will be 
deleted.
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a single run.
+                                       If provided and greater than zero, rows 
are selected
+                                       deterministically from the oldest first 
(by timestamp then id)
+                                       up to this limit in this execution.
+    """  # noqa: E501
+
+    def __init__(self, retention_period_days: int, max_rows_per_run: int | 
None = None):
+        """
+        :param retention_period_days: Number of days to keep in the tasks table
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from the
+            oldest first (by timestamp then id) up to this limit in this 
execution.
+        """  # noqa: E501
+        self.retention_period_days = retention_period_days
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs that need to be deleted
+        # Only delete completed tasks (success, failure, or aborted)
+        from superset.models.tasks import Task
+
+        select_stmt = sa.select(Task.id).where(
+            Task.ended_at < datetime.now() - 
timedelta(days=self.retention_period_days),
+            Task.status.in_(
+                [
+                    TaskStatus.SUCCESS.value,
+                    TaskStatus.FAILURE.value,
+                    TaskStatus.ABORTED.value,
+                    TaskStatus.TIMED_OUT.value,
+                ]
+            ),
+        )
+
+        # Optionally limited by max_rows_per_run
+        # order by oldest first for deterministic deletion
+        if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
+            select_stmt = select_stmt.order_by(
+                Task.ended_at.asc(), Task.id.asc()
+            ).limit(self.max_rows_per_run)
+
+        ids_to_delete = db.session.execute(select_stmt).scalars().all()
+
+        total_rows = len(ids_to_delete)
+
+        logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
+
+        next_logging_threshold = 1
+
+        # Iterate over the IDs in batches
+        for i in range(0, total_rows, batch_size):
+            batch_ids = ids_to_delete[i : i + batch_size]
+
+            # Delete the selected batch using IN clause
+            result = 
db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids)))
+
+            # Update the total number of deleted records
+            total_deleted += result.rowcount
+
+            # Explicitly commit the transaction given that if an error occurs, 
we want to ensure that the  # noqa: E501
+            # records that have been deleted so far are committed
+            db.session.commit()
+
+            # Log the number of deleted records every 1% increase in progress
+            percentage_complete = (total_deleted / total_rows) * 100
+            if percentage_complete >= next_logging_threshold:
+                logger.info(
+                    "Deleted %s rows from the tasks table older than %s days 
(%d%% complete)",  # noqa: E501
+                    f"{total_deleted:,}",
+                    self.retention_period_days,
+                    percentage_complete,
+                )
+                next_logging_threshold += 1

Review Comment:
   **Suggestion:** Resource exhaustion: the code loads all matching task IDs 
into memory via `.scalars().all()` which can OOM on large tables; instead 
stream IDs from the DB and delete in batches (use a count query for progress, 
iterate over `Result.scalars()` and flush batches) to avoid building a huge 
list. [memory leak]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Prune command may OOM preventing DB cleanup.
   - ⚠️ Scheduled maintenance tasks may fail (TaskPruneCommand).
   - ⚠️ Database storage growth due to retained rows.
   ```
   </details>
   
   ```suggestion
           # Compute total_rows efficiently (respect max_rows_per_run if 
provided)
               count_stmt = sa.select(sa.func.count()).select_from(Task).where(
                   Task.ended_at < datetime.now() - 
timedelta(days=self.retention_period_days),
                   Task.status.in_(
                       [
                           TaskStatus.SUCCESS.value,
                           TaskStatus.FAILURE.value,
                           TaskStatus.ABORTED.value,
                           TaskStatus.TIMED_OUT.value,
                       ]
                   ),
               )
               total_rows = db.session.execute(count_stmt).scalar() or 0
               if self.max_rows_per_run is not None and self.max_rows_per_run > 
0:
                   total_rows = min(total_rows, self.max_rows_per_run)
   
               logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
   
               next_logging_threshold = 1
   
               # Stream IDs and delete in batches to avoid loading all IDs into 
memory
               result = db.session.execute(select_stmt)
               batch_ids: list[int] = []
               for task_id in result.scalars():
                   batch_ids.append(task_id)
                   if len(batch_ids) >= batch_size:
                       res = 
db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids)))
                       deleted = res.rowcount or 0
                       total_deleted += deleted
                       db.session.commit()
   
                       percentage_complete = (total_deleted / total_rows) * 100 
if total_rows else 100
                       if percentage_complete >= next_logging_threshold:
                           logger.info(
                               "Deleted %s rows from the tasks table older than 
%s days (%.2f%% complete)",
                               f"{total_deleted:,}",
                               self.retention_period_days,
                               percentage_complete,
                           )
                           next_logging_threshold += 1
                       batch_ids = []
   
               # Final batch if any
               if batch_ids:
                   res = 
db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids)))
                   deleted = res.rowcount or 0
                   total_deleted += deleted
                   db.session.commit()
                   percentage_complete = (total_deleted / total_rows) * 100 if 
total_rows else 100
                   if percentage_complete >= next_logging_threshold:
                       logger.info(
                           "Deleted %s rows from the tasks table older than %s 
days (%.2f%% complete)",
                           f"{total_deleted:,}",
                           self.retention_period_days,
                           percentage_complete,
                       )
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Populate the tasks table with a large number of terminal Task rows (e.g. 
>100k).
   
      Insert rows into the Task model referenced by `from superset.models.tasks 
import Task`
   
      (see superset/commands/tasks/prune.py:74 where Task is imported).
   
   2. In a Python process, import and run the command directly:
   
      from superset.commands.tasks.prune import TaskPruneCommand
   
      cmd = TaskPruneCommand(retention_period_days=1)
   
      cmd.run()
   
      (run() is defined at superset/commands/tasks/prune.py:64 and will execute 
the code at
      lines 95-126).
   
   3. When execution reaches line 95 (`ids_to_delete =
   db.session.execute(select_stmt).scalars().all()`)
   
      the code will materialize all matching IDs into memory.
   
   4. Observe process memory growth and eventual OOM / memory pressure when the 
result set is
   very large,
   
      causing the prune run to crash or the host to kill the process.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/commands/tasks/prune.py
   **Line:** 95:126
   **Comment:**
        *Memory Leak: Resource exhaustion: the code loads all matching task IDs 
into memory via `.scalars().all()` which can OOM on large tables; instead 
stream IDs from the DB and delete in batches (use a count query for progress, 
iterate over `Result.scalars()` and flush batches) to avoid building a huge 
list.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/commands/tasks/cancel.py:
##########
@@ -0,0 +1,301 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unified cancel task command for GTF."""
+
+import logging
+from functools import partial
+from typing import TYPE_CHECKING
+
+from superset_core.api.tasks import TaskScope, TaskStatus
+
+from superset.commands.base import BaseCommand
+from superset.commands.tasks.exceptions import (
+    TaskAbortFailedError,
+    TaskNotAbortableError,
+    TaskNotFoundError,
+    TaskPermissionDeniedError,
+)
+from superset.extensions import security_manager
+from superset.tasks.locks import task_lock
+from superset.tasks.utils import get_active_dedup_key
+from superset.utils.core import get_user_id
+from superset.utils.decorators import on_error, transaction
+
+if TYPE_CHECKING:
+    from superset.models.tasks import Task
+
+logger = logging.getLogger(__name__)
+
+
+class CancelTaskCommand(BaseCommand):
+    """
+    Unified command to cancel a task.
+
+    Behavior:
+    - For private tasks or single-subscriber tasks: aborts the task
+    - For shared tasks with multiple subscribers (non-admin): unsubscribes user
+    - For shared tasks with force=True (admin only): aborts for all subscribers
+
+    The term "cancel" is user-facing; internally this may abort or unsubscribe.
+
+    This command acquires a distributed lock before starting a transaction to
+    prevent race conditions with concurrent submit/cancel operations.
+
+    Permission checks are deferred to inside the lock to minimize SELECTs:
+    we only fetch the task once, then validate permissions on the fetched data.
+    """
+
+    def __init__(self, task_uuid: str, force: bool = False):
+        """
+        Initialize the cancel command.
+
+        :param task_uuid: UUID of the task to cancel
+        :param force: If True, force abort even with multiple subscribers 
(admin only)
+        """
+        self._task_uuid = task_uuid
+        self._force = force
+        self._action_taken: str = (
+            "cancelled"  # Will be set to 'aborted' or 'unsubscribed'
+        )
+        self._should_publish_abort: bool = False
+
+    def run(self) -> "Task":
+        """
+        Execute the cancel command with distributed locking.
+
+        The lock is acquired BEFORE starting the transaction to avoid holding
+        a DB connection during lock acquisition. Uses dedup_key as lock key
+        to ensure Submit and Cancel operations use the same lock.
+
+        :returns: The updated task model
+        """
+        from superset.daos.tasks import TaskDAO
+
+        # Lightweight fetch to compute dedup_key for locking
+        # This is needed to use the same lock key as SubmitTaskCommand
+        task = TaskDAO.find_one_or_none(
+            skip_base_filter=security_manager.is_admin(), uuid=self._task_uuid
+        )
+
+        if not task:
+            raise TaskNotFoundError()
+
+        # Compute dedup_key using the same logic as SubmitTaskCommand
+        dedup_key = get_active_dedup_key(
+            scope=task.scope,
+            task_type=task.task_type,
+            task_key=task.task_key,
+            user_id=task.user_id if task.is_private else None,
+        )
+
+        # Acquire lock BEFORE transaction starts
+        # Using dedup_key ensures Submit and Cancel use the same lock
+        with task_lock(dedup_key):
+            result = self._execute_with_transaction()
+
+        # Publish abort notification AFTER transaction commits
+        # This prevents race conditions where listeners check DB before commit
+        if self._should_publish_abort:
+            from superset.tasks.manager import TaskManager
+
+            TaskManager.publish_abort(self._task_uuid)
+
+        return result
+
+    @transaction(on_error=partial(on_error, reraise=TaskAbortFailedError))
+    def _execute_with_transaction(self) -> "Task":
+        """
+        Execute the cancel operation inside a transaction.
+
+        Combines fetch + validation + execution in a single transaction,
+        reducing the number of SELECTs from 3 to 1 (plus DAO operations).
+
+        :returns: The updated task model
+        """
+        from superset.daos.tasks import TaskDAO
+
+        # Check admin status (no DB access)
+        is_admin = security_manager.is_admin()
+
+        # Force flag requires admin
+        if self._force and not is_admin:
+            raise TaskPermissionDeniedError(
+                "Only administrators can force cancel a task"
+            )
+
+        # Single SELECT: fetch task and validate permissions on it
+        task = TaskDAO.find_one_or_none(skip_base_filter=is_admin, 
uuid=self._task_uuid)
+
+        if not task:
+            raise TaskNotFoundError()
+
+        # Validate permissions on the fetched task
+        self._validate_permissions(task, is_admin)
+
+        # Execute cancel and return updated task
+        return self._do_cancel(task, is_admin)
+
+    def _validate_permissions(self, task: "Task", is_admin: bool) -> None:
+        """
+        Validate permissions on an already-fetched task.
+
+        Permission rules by scope:
+        - private: Only creator or admin (already filtered by base_filter)
+        - shared: Subscribers or admin
+        - system: Only admin
+
+        :param task: The task to validate permissions for
+        :param is_admin: Whether current user is admin
+        :raises TaskAbortFailedError: If task is not in cancellable state
+        :raises TaskPermissionDeniedError: If user lacks permission
+        """
+        # Check if task is in a cancellable state
+        if task.status not in [
+            TaskStatus.PENDING.value,
+            TaskStatus.IN_PROGRESS.value,
+            TaskStatus.ABORTING.value,  # Already aborting is OK (idempotent)
+        ]:
+            raise TaskAbortFailedError()
+
+        # Admin can cancel anything
+        if is_admin:
+            return
+
+        # Non-admin permission checks by scope
+        user_id = get_user_id()
+
+        if task.scope == TaskScope.SYSTEM.value:

Review Comment:
   **Suggestion:** Inconsistent TaskScope comparison: the code compares 
`task.scope` to `TaskScope.SYSTEM.value` but `task.scope` may be stored as 
either the enum member or a string; comparing to `.value` will miss matches 
when `task.scope` is the enum and allow unauthorized non-admin cancellations of 
system tasks. Normalize the scope to the enum before comparing (e.g., 
`TaskScope(task.scope) == TaskScope.SYSTEM`). [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Unauthorized cancellation of system tasks.
   - ⚠️ CancelTaskCommand permission enforcement compromised.
   - ⚠️ Task abort audit/integrity may be violated.
   ```
   </details>
   
   ```suggestion
           # Normalize scope to TaskScope enum to support either stored string 
or enum
           if TaskScope(task.scope) == TaskScope.SYSTEM:
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Ensure a Task row exists such that TaskDAO.find_one_or_none() returns a 
model where
   `task.scope` is a TaskScope enum instance (not a string). The Cancel flow 
starts at
   `superset/commands/tasks/cancel.py:76` (def run) which fetches the task via
   `TaskDAO.find_one_or_none(...)` at lines `86-92`.
   
   2. As a non-admin user, call CancelTaskCommand.run() (instantiate 
CancelTaskCommand and
   invoke run) so execution reaches `_execute_with_transaction()` at
   `superset/commands/tasks/cancel.py:119` and then `_validate_permissions()` at
   `superset/commands/tasks/cancel.py:152`.
   
   3. Inside `_validate_permissions()` the code at lines `181-185` performs `if 
task.scope ==
   TaskScope.SYSTEM.value:`. If `task.scope` is a TaskScope enum instance, this 
equality
   check will be False (enum != string), so the admin-only branch is skipped.
   
   4. Because the system-scope guard was missed, the command proceeds to 
subsequent non-admin
   checks (subscriber checks at `superset/commands/tasks/cancel.py:187-192`) 
and may allow a
   non-admin to continue to `_do_cancel()` and abort/unsubscribe a system task, 
producing an
   unauthorized cancellation. This demonstrates the permission-check bypass.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/commands/tasks/cancel.py
   **Line:** 181:181
   **Comment:**
        *Logic Error: Inconsistent TaskScope comparison: the code compares 
`task.scope` to `TaskScope.SYSTEM.value` but `task.scope` may be stored as 
either the enum member or a string; comparing to `.value` will miss matches 
when `task.scope` is the enum and allow unauthorized non-admin cancellations of 
system tasks. Normalize the scope to the enum before comparing (e.g., 
`TaskScope(task.scope) == TaskScope.SYSTEM`).
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/migrations/versions/2025_12_18_0220_create_tasks_table.py:
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Create tasks and task_subscriber tables for Global Task Framework (GTF)
+
+Revision ID: 4b2a8c9d3e1f
+Revises: 9787190b3d89
+Create Date: 2025-12-18 02:20:00.000000
+
+"""
+
+from sqlalchemy import (
+    Column,
+    DateTime,
+    Integer,
+    String,
+    Text,
+    UniqueConstraint,
+)
+
+from superset.migrations.shared.utils import (
+    create_fks_for_table,
+    create_index,
+    create_table,
+    drop_fks_for_table,
+    drop_index,
+    drop_table,
+)
+
+# revision identifiers, used by Alembic.
+revision = "4b2a8c9d3e1f"
+down_revision = "9787190b3d89"
+
+TASKS_TABLE = "tasks"
+TASK_SUBSCRIBERS_TABLE = "task_subscribers"
+
+
+def upgrade():
+    """
+    Create tasks and task_subscribers tables for the Global Task Framework 
(GTF).
+
+    This migration creates:
+    1. tasks table - unified tracking for all long running tasks
+    2. task_subscribers table - multi-user task subscriptions for shared tasks
+
+    The scope feature allows tasks to be:
+    - private: user-specific (default)
+    - shared: multi-user collaborative tasks
+    - system: admin-only background tasks
+    """
+    # Create tasks table
+    create_table(
+        TASKS_TABLE,
+        Column("id", Integer, primary_key=True),
+        Column("uuid", String(36), nullable=False, unique=True),
+        Column("task_key", String(256), nullable=False),  # For deduplication
+        Column("task_type", String(100), nullable=False),  # e.g., 
'sql_execution'
+        Column("task_name", String(256), nullable=True),  # Human readable name
+        Column(
+            "scope", String(20), nullable=False, server_default="private"
+        ),  # private/shared/system
+        Column("status", String(50), nullable=False),  # PENDING, IN_PROGRESS, 
etc.
+        Column("dedup_key", String(512), nullable=False),  # Computed 
deduplication key

Review Comment:
   **Suggestion:** Compatibility/runtime error risk on MySQL/InnoDB with 
utf8mb4: `dedup_key` is defined as `String(512)` and then indexed uniquely; 
large indexed varchar columns can exceed MySQL index size limits and make the 
migration fail on common MySQL setups — reduce the column length (e.g. to 191) 
or use a hashed/shorter column for indexes to ensure the unique index can be 
created on all supported MySQL configurations. [possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Migration fails on MySQL utf8mb4 installs.
   - ⚠️ GTF unavailable until migration fixed.
   ```
   </details>
   
   ```suggestion
           Column("dedup_key", String(191), nullable=False),  # Computed 
deduplication key (shortened for indexed column compatibility)
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Migration defines dedup_key as Column(\"dedup_key\", String(512), 
nullable=False) at
   line 76 and then
   
      creates a unique index on dedup_key at line 91 (`create_index(... 
"idx_tasks_dedup_key"
      ...)`).
   
   2. Run Alembic upgrade against a MySQL/MariaDB server with default utf8mb4 
settings
   (common production configuration).
   
   3. When op.create_index() (called via shared/utils.create_index at line 416 
in that
   module) issues the CREATE INDEX,
   
      MySQL may reject it because a 512-char utf8mb4 varchar exceeds InnoDB 
prefix/index byte
      limits (index length > 767/1000 bytes),
   
      causing the migration to fail with a MySQL error (index key too long).
   
   4. Observing this requires applying the migration on a MySQL instance 
configured with
   utf8mb4 and default InnoDB limits;
   
      failure is deterministic in that environment.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/migrations/versions/2025_12_18_0220_create_tasks_table.py
   **Line:** 76:76
   **Comment:**
        *Possible Bug: Compatibility/runtime error risk on MySQL/InnoDB with 
utf8mb4: `dedup_key` is defined as `String(512)` and then indexed uniquely; 
large indexed varchar columns can exceed MySQL index size limits and make the 
migration fail on common MySQL setups — reduce the column length (e.g. to 191) 
or use a hashed/shorter column for indexes to ensure the unique index can be 
created on all supported MySQL configurations.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/migrations/versions/2025_12_18_0220_create_tasks_table.py:
##########
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Create tasks and task_subscriber tables for Global Task Framework (GTF)
+
+Revision ID: 4b2a8c9d3e1f
+Revises: 9787190b3d89
+Create Date: 2025-12-18 02:20:00.000000
+
+"""
+
+from sqlalchemy import (
+    Column,
+    DateTime,
+    Integer,
+    String,
+    Text,
+    UniqueConstraint,
+)
+
+from superset.migrations.shared.utils import (
+    create_fks_for_table,
+    create_index,
+    create_table,
+    drop_fks_for_table,
+    drop_index,
+    drop_table,
+)
+
+# revision identifiers, used by Alembic.
+revision = "4b2a8c9d3e1f"
+down_revision = "9787190b3d89"
+
+TASKS_TABLE = "tasks"
+TASK_SUBSCRIBERS_TABLE = "task_subscribers"
+
+
+def upgrade():
+    """
+    Create tasks and task_subscribers tables for the Global Task Framework 
(GTF).
+
+    This migration creates:
+    1. tasks table - unified tracking for all long running tasks
+    2. task_subscribers table - multi-user task subscriptions for shared tasks
+
+    The scope feature allows tasks to be:
+    - private: user-specific (default)
+    - shared: multi-user collaborative tasks
+    - system: admin-only background tasks
+    """
+    # Create tasks table
+    create_table(
+        TASKS_TABLE,
+        Column("id", Integer, primary_key=True),
+        Column("uuid", String(36), nullable=False, unique=True),
+        Column("task_key", String(256), nullable=False),  # For deduplication
+        Column("task_type", String(100), nullable=False),  # e.g., 
'sql_execution'
+        Column("task_name", String(256), nullable=True),  # Human readable name
+        Column(
+            "scope", String(20), nullable=False, server_default="private"
+        ),  # private/shared/system
+        Column("status", String(50), nullable=False),  # PENDING, IN_PROGRESS, 
etc.
+        Column("dedup_key", String(512), nullable=False),  # Computed 
deduplication key
+        # AuditMixinNullable columns
+        Column("created_on", DateTime, nullable=True),
+        Column("changed_on", DateTime, nullable=True),
+        Column("created_by_fk", Integer, nullable=True),
+        Column("changed_by_fk", Integer, nullable=True),
+        # Task-specific columns
+        Column("started_at", DateTime, nullable=True),
+        Column("ended_at", DateTime, nullable=True),
+        Column("user_id", Integer, nullable=True),  # User context for 
execution
+        Column("payload", Text, nullable=True),  # JSON serialized 
task-specific data
+        Column("properties", Text, nullable=True),  # JSON serialized 
properties
+    )
+
+    # Create indexes for optimal query performance
+    create_index(TASKS_TABLE, "idx_tasks_dedup_key", ["dedup_key"], 
unique=True)

Review Comment:
   **Suggestion:** Logic bug: deduplication is scope-aware but the migration 
creates a globally-unique index on `dedup_key`, preventing the same dedup key 
from being used in different `scope` values; change the index to include 
`scope` so uniqueness is enforced per-scope (or make the index non-unique if 
global uniqueness is intended). [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ GTF task creation blocked across scopes.
   - ⚠️ Deduplication semantics differ from design.
   ```
   </details>
   
   ```suggestion
       create_index(TASKS_TABLE, "idx_tasks_dedup_key", ["scope", "dedup_key"], 
unique=True)
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Apply the migration by running Alembic upgrade which executes upgrade() in
   
      superset/migrations/versions/2025_12_18_0220_create_tasks_table.py 
(upgrade() starts at
      line 51)
   
      and performs index creation at line 91 (`create_index(... 
"idx_tasks_dedup_key" ...)`).
   
   2. After migration, insert a task row into the tasks table with 
dedup_key='K' and
   scope='private' (the tasks
   
      table/columns are created at lines 65-88; dedup_key column defined at 
line 76).
   
   3. Attempt to insert a second task row with the same dedup_key='K' but 
scope='shared'.
   
   4. The DB will reject the second insert with a unique 
constraint/IntegrityError because
   the unique index
   
      created at line 91 enforces uniqueness only on dedup_key across all 
scopes (expected
      behavior: allow same
   
      dedup_key in different scopes), reproducing the logic bug.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/migrations/versions/2025_12_18_0220_create_tasks_table.py
   **Line:** 91:91
   **Comment:**
        *Logic Error: Logic bug: deduplication is scope-aware but the migration 
creates a globally-unique index on `dedup_key`, preventing the same dedup key 
from being used in different `scope` values; change the index to include 
`scope` so uniqueness is enforced per-scope (or make the index non-unique if 
global uniqueness is intended).
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/commands/tasks/cancel.py:
##########
@@ -0,0 +1,301 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unified cancel task command for GTF."""
+
+import logging
+from functools import partial
+from typing import TYPE_CHECKING
+
+from superset_core.api.tasks import TaskScope, TaskStatus
+
+from superset.commands.base import BaseCommand
+from superset.commands.tasks.exceptions import (
+    TaskAbortFailedError,
+    TaskNotAbortableError,
+    TaskNotFoundError,
+    TaskPermissionDeniedError,
+)
+from superset.extensions import security_manager
+from superset.tasks.locks import task_lock
+from superset.tasks.utils import get_active_dedup_key
+from superset.utils.core import get_user_id
+from superset.utils.decorators import on_error, transaction
+
+if TYPE_CHECKING:
+    from superset.models.tasks import Task
+
+logger = logging.getLogger(__name__)
+
+
+class CancelTaskCommand(BaseCommand):
+    """
+    Unified command to cancel a task.
+
+    Behavior:
+    - For private tasks or single-subscriber tasks: aborts the task
+    - For shared tasks with multiple subscribers (non-admin): unsubscribes user
+    - For shared tasks with force=True (admin only): aborts for all subscribers
+
+    The term "cancel" is user-facing; internally this may abort or unsubscribe.
+
+    This command acquires a distributed lock before starting a transaction to
+    prevent race conditions with concurrent submit/cancel operations.
+
+    Permission checks are deferred to inside the lock to minimize SELECTs:
+    we only fetch the task once, then validate permissions on the fetched data.
+    """
+
+    def __init__(self, task_uuid: str, force: bool = False):
+        """
+        Initialize the cancel command.
+
+        :param task_uuid: UUID of the task to cancel
+        :param force: If True, force abort even with multiple subscribers 
(admin only)
+        """
+        self._task_uuid = task_uuid
+        self._force = force
+        self._action_taken: str = (
+            "cancelled"  # Will be set to 'aborted' or 'unsubscribed'
+        )
+        self._should_publish_abort: bool = False
+
+    def run(self) -> "Task":
+        """
+        Execute the cancel command with distributed locking.
+
+        The lock is acquired BEFORE starting the transaction to avoid holding
+        a DB connection during lock acquisition. Uses dedup_key as lock key
+        to ensure Submit and Cancel operations use the same lock.
+
+        :returns: The updated task model
+        """
+        from superset.daos.tasks import TaskDAO
+
+        # Lightweight fetch to compute dedup_key for locking
+        # This is needed to use the same lock key as SubmitTaskCommand
+        task = TaskDAO.find_one_or_none(
+            skip_base_filter=security_manager.is_admin(), uuid=self._task_uuid
+        )
+
+        if not task:
+            raise TaskNotFoundError()
+
+        # Compute dedup_key using the same logic as SubmitTaskCommand
+        dedup_key = get_active_dedup_key(
+            scope=task.scope,
+            task_type=task.task_type,
+            task_key=task.task_key,
+            user_id=task.user_id if task.is_private else None,
+        )
+
+        # Acquire lock BEFORE transaction starts
+        # Using dedup_key ensures Submit and Cancel use the same lock
+        with task_lock(dedup_key):
+            result = self._execute_with_transaction()
+
+        # Publish abort notification AFTER transaction commits
+        # This prevents race conditions where listeners check DB before commit
+        if self._should_publish_abort:
+            from superset.tasks.manager import TaskManager
+
+            TaskManager.publish_abort(self._task_uuid)
+
+        return result
+
+    @transaction(on_error=partial(on_error, reraise=TaskAbortFailedError))
+    def _execute_with_transaction(self) -> "Task":
+        """
+        Execute the cancel operation inside a transaction.
+
+        Combines fetch + validation + execution in a single transaction,
+        reducing the number of SELECTs from 3 to 1 (plus DAO operations).
+
+        :returns: The updated task model
+        """
+        from superset.daos.tasks import TaskDAO
+
+        # Check admin status (no DB access)
+        is_admin = security_manager.is_admin()
+
+        # Force flag requires admin
+        if self._force and not is_admin:
+            raise TaskPermissionDeniedError(
+                "Only administrators can force cancel a task"
+            )
+
+        # Single SELECT: fetch task and validate permissions on it
+        task = TaskDAO.find_one_or_none(skip_base_filter=is_admin, 
uuid=self._task_uuid)
+
+        if not task:
+            raise TaskNotFoundError()
+
+        # Validate permissions on the fetched task
+        self._validate_permissions(task, is_admin)
+
+        # Execute cancel and return updated task
+        return self._do_cancel(task, is_admin)
+
+    def _validate_permissions(self, task: "Task", is_admin: bool) -> None:
+        """
+        Validate permissions on an already-fetched task.
+
+        Permission rules by scope:
+        - private: Only creator or admin (already filtered by base_filter)
+        - shared: Subscribers or admin
+        - system: Only admin
+
+        :param task: The task to validate permissions for
+        :param is_admin: Whether current user is admin
+        :raises TaskAbortFailedError: If task is not in cancellable state
+        :raises TaskPermissionDeniedError: If user lacks permission
+        """
+        # Check if task is in a cancellable state
+        if task.status not in [
+            TaskStatus.PENDING.value,
+            TaskStatus.IN_PROGRESS.value,
+            TaskStatus.ABORTING.value,  # Already aborting is OK (idempotent)
+        ]:
+            raise TaskAbortFailedError()
+
+        # Admin can cancel anything
+        if is_admin:
+            return
+
+        # Non-admin permission checks by scope
+        user_id = get_user_id()
+
+        if task.scope == TaskScope.SYSTEM.value:
+            # System tasks are admin-only
+            raise TaskPermissionDeniedError(
+                "Only administrators can cancel system tasks"
+            )
+
+        if task.is_shared:
+            # Shared tasks: must be a subscriber
+            if not user_id or not task.has_subscriber(user_id):
+                raise TaskPermissionDeniedError(
+                    "You must be subscribed to cancel this shared task"
+                )
+
+        # Private tasks: already filtered by base_filter (only creator can see)
+        # If we got here, user has permission
+
+    def _do_cancel(self, task: "Task", is_admin: bool) -> "Task":
+        """
+        Execute the cancel operation (abort or unsubscribe).
+
+        :param task: The task to cancel
+        :param is_admin: Whether current user is admin
+        :returns: The updated task model
+        """
+        user_id = get_user_id()
+
+        # Determine action based on task scope and force flag
+        should_abort = (
+            # Admin with force flag always aborts
+            (is_admin and self._force)
+            # Private tasks always abort (only one user)
+            or task.is_private
+            # System tasks always abort (admin only anyway)
+            or task.is_system
+            # Single or last subscriber - abort
+            or task.subscriber_count <= 1
+        )
+
+        if should_abort:
+            return self._do_abort(task, is_admin)
+        else:
+            return self._do_unsubscribe(task, user_id)
+
+    def _do_abort(self, task: "Task", is_admin: bool) -> "Task":
+        """
+        Execute abort operation.
+
+        :param task: The task to abort
+        :param is_admin: Whether current user is admin
+        :returns: The updated task model
+        """
+        from superset.daos.tasks import TaskDAO
+
+        try:
+            result = TaskDAO.abort_task(task.uuid, skip_base_filter=is_admin)
+        except TaskNotAbortableError:
+            raise
+
+        if result is None:
+            # abort_task returned None - task wasn't aborted
+            # This can happen if task is already finished
+            raise TaskAbortFailedError()
+
+        self._action_taken = "aborted"
+
+        # Track if we need to publish abort after commit
+        if result.status == TaskStatus.ABORTING.value:

Review Comment:
   **Suggestion:** Inconsistent TaskStatus comparison: the code checks 
`result.status == TaskStatus.ABORTING.value`, but `result.status` may be stored 
as either an enum or a string; this comparison can silently fail to detect an 
aborting state. Normalize the status to the enum before comparing (e.g., 
`TaskStatus(result.status) == TaskStatus.ABORTING`). [type error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Abort notifications not published post-abort.
   - ⚠️ TaskManager.publish_abort callers receive no signal.
   - ⚠️ Subscribers/listeners miss abort events.
   ```
   </details>
   
   ```suggestion
           if TaskStatus(result.status) == TaskStatus.ABORTING:
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Trigger CancelTaskCommand.run() for a task that will reach `_do_abort()` 
at
   `superset/commands/tasks/cancel.py:224` (this path is reached when 
`should_abort` is True
   in `_do_cancel()` at `superset/commands/tasks/cancel.py:207-222`).
   
   2. `_do_abort()` calls `TaskDAO.abort_task(...)` at
   `superset/commands/tasks/cancel.py:235` and assigns the returned model to 
`result`.
   
   3. If the `result.status` attribute is a TaskStatus enum instance (not a 
string), the
   equality check at `superset/commands/tasks/cancel.py:247` (`result.status ==
   TaskStatus.ABORTING.value`) will be False, so `self._should_publish_abort` 
remains False.
   
   4. After the transaction commits, `run()` checks 
`self._should_publish_abort` at
   `superset/commands/tasks/cancel.py:112-116` and—because it is False—skips
   `TaskManager.publish_abort(self._task_uuid)`. The abort notification is not 
published, so
   any abort listeners relying on publish/subscribe will not be notified.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/commands/tasks/cancel.py
   **Line:** 247:247
   **Comment:**
        *Type Error: Inconsistent TaskStatus comparison: the code checks 
`result.status == TaskStatus.ABORTING.value`, but `result.status` may be stored 
as either an enum or a string; this comparison can silently fail to detect an 
aborting state. Normalize the status to the enum before comparing (e.g., 
`TaskStatus(result.status) == TaskStatus.ABORTING`).
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to