codeant-ai-for-open-source[bot] commented on code in PR #36368:
URL: https://github.com/apache/superset/pull/36368#discussion_r2748720072


##########
superset/tasks/utils.py:
##########
@@ -142,3 +151,166 @@ def fetch_csrf_token(
 
     logger.error("Error fetching CSRF token, status code: %s", response.status)
     return {}
+
+
+def generate_random_task_key() -> str:
+    """
+    Generate a random task key.
+
+    This is the default behavior - each task submission gets a unique UUID
+    unless an explicit task_key is provided in TaskOptions.
+
+    :returns: A random UUID string
+    """
+    return str(uuid4())
+
+
+def get_active_dedup_key(
+    scope: TaskScope | str,
+    task_type: str,
+    task_key: str,
+    user_id: int | None = None,
+) -> str:
+    """
+    Build a deduplication key for active tasks.
+
+    The dedup_key enforces uniqueness at the database level via a unique index.
+    Active tasks use a composite key based on scope, which is then hashed using
+    the configured HASH_ALGORITHM to produce a fixed-length key.
+
+    The composite key format before hashing is:
+    - Private: private|task_type|task_key|user_id
+    - Shared: shared|task_type|task_key
+    - System: system|task_type|task_key
+
+    The final key is a hash digest (64 chars for sha256, 32 chars for md5).
+
+    :param scope: Task scope (PRIVATE/SHARED/SYSTEM) as TaskScope enum or 
string
+    :param task_type: Type of task (e.g., 'sql_execution')
+    :param task_key: Task identifier for deduplication
+    :param user_id: User ID for private tasks (falls back to g.user if not 
provided)
+    :returns: Hashed deduplication key string
+    :raises ValueError: If user_id is missing for private scope
+    """
+    # Convert string to TaskScope if needed
+    if isinstance(scope, str):
+        scope = TaskScope(scope)
+
+    # Build composite key
+    match scope:
+        case TaskScope.PRIVATE:
+            # Use provided user_id, or fall back to current user from Flask 
context
+            effective_user_id = user_id
+            if effective_user_id is None:
+                from superset.utils.core import get_user_id
+
+                effective_user_id = get_user_id()
+            if effective_user_id is None:
+                raise ValueError("user_id required for private tasks")
+            composite_key = 
f"{scope.value}|{task_type}|{task_key}|{effective_user_id}"
+        case TaskScope.SHARED:
+            composite_key = f"{scope.value}|{task_type}|{task_key}"
+        case TaskScope.SYSTEM:
+            composite_key = f"{scope.value}|{task_type}|{task_key}"
+        case _:
+            raise ValueError(f"Invalid scope: {scope}")
+
+    # Hash the composite key to produce a fixed-length dedup_key
+    # Truncate to 64 chars max to fit the database column in case
+    # a hash algo is used that generates hashes that exceed 64 chars
+    return hash_from_str(composite_key)[:64]
+
+
+def get_finished_dedup_key(task_uuid: str) -> str:
+    """
+    Build a deduplication key for finished tasks.
+
+    When a task completes (success, failure, or abort), its dedup_key is
+    changed to its UUID. This frees up the slot so new tasks with the same
+    parameters can be created.
+
+    :param task_uuid: Task UUID
+    :returns: The task UUID as the dedup key
+
+    Example:
+        >>> get_finished_dedup_key("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
+        'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
+    """
+    return task_uuid
+
+
+# -----------------------------------------------------------------------------
+# TaskProperties helper functions
+# -----------------------------------------------------------------------------
+
+
+def progress_update(progress: float | int | tuple[int, int]) -> TaskProperties:
+    """
+    Create a properties update dict for progress values.
+
+    :param progress: One of:
+        - float (0.0-1.0): Percentage only
+        - int: Count only (total unknown)
+        - tuple[int, int]: (current, total) with auto-computed percentage
+    :returns: TaskProperties dict with appropriate progress fields set
+
+    Example:
+        task.update_properties(progress_update((50, 100)))
+    """
+    if isinstance(progress, float):
+        return {"progress_percent": progress}
+    if isinstance(progress, int):
+        return {"progress_current": progress}
+    # tuple
+    current, total = progress
+    result: TaskProperties = {
+        "progress_current": current,
+        "progress_total": total,
+    }
+    if total > 0:
+        result["progress_percent"] = current / total
+    return result
+
+
+def error_update(exception: BaseException) -> TaskProperties:
+    """
+    Create a properties update dict from an exception.
+
+    :param exception: The exception that caused the failure
+    :returns: TaskProperties dict with error fields populated
+    """
+    return {
+        "error_message": str(exception),
+        "exception_type": type(exception).__name__,
+        "stack_trace": traceback.format_exc(),

Review Comment:
   **Suggestion:** The stack trace in the error properties is taken from 
`traceback.format_exc()` instead of the passed-in exception, so if 
`error_update` is called outside an `except` block (or after a different 
exception was handled) it will record an empty or incorrect traceback unrelated 
to the actual exception object. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Task error reports store incorrect stack traces.
   - ⚠️ Task debugging information becomes misleading.
   - ⚠️ Maintainers waste time investigating wrong traces.
   ```
   </details>
   
   ```suggestion
           "stack_trace": "".join(
               traceback.format_exception(
                   type(exception), exception, exception.__traceback__
               )
           ),
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. In a Python REPL or test, import the helper at 
`superset/tasks/utils.py:275`:
   
      from superset.tasks.utils import error_update
   
   2. Create an exception object and call the helper outside an except block:
   
      ex = ValueError("test")
   
      props = error_update(ex)
   
      (This calls `superset/tasks/utils.py:275-286`.)
   
   3. Observe `props["stack_trace"]` contains the most recent handled traceback 
(or an
   empty/irrelevant trace)
   
      because `traceback.format_exc()` uses the current exception context, not 
the passed
      `ex`.
   
   4. Alternatively, reproduce by calling `error_update()` after handling a 
different
   exception:
   
      try:
   
          raise KeyError("other")
   
      except KeyError:
   
          pass
   
      props = error_update(ValueError("test"))
   
      The recorded stack trace corresponds to the earlier KeyError (or is 
empty), not the
      ValueError passed in.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/tasks/utils.py
   **Line:** 285:285
   **Comment:**
        *Logic Error: The stack trace in the error properties is taken from 
`traceback.format_exc()` instead of the passed-in exception, so if 
`error_update` is called outside an `except` block (or after a different 
exception was handled) it will record an empty or incorrect traceback unrelated 
to the actual exception object.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/tasks/test_utils.py:
##########
@@ -330,3 +340,248 @@ def test_get_executor(
         )
         assert executor_type == expected_executor_type
         assert executor == expected_executor
+
+
[email protected](
+    "scope,task_type,task_key,user_id,expected_composite_key",
+    [
+        # Private tasks with TaskScope enum
+        (
+            TaskScope.PRIVATE,
+            "sql_execution",
+            "chart_123",
+            42,
+            "private|sql_execution|chart_123|42",
+        ),
+        (
+            TaskScope.PRIVATE,
+            "thumbnail_gen",
+            "dash_456",
+            100,
+            "private|thumbnail_gen|dash_456|100",
+        ),
+        # Private tasks with string scope
+        (
+            "private",
+            "api_call",
+            "endpoint_789",
+            200,
+            "private|api_call|endpoint_789|200",
+        ),
+        # Shared tasks with TaskScope enum
+        (
+            TaskScope.SHARED,
+            "report_gen",
+            "monthly_report",
+            None,
+            "shared|report_gen|monthly_report",
+        ),
+        (
+            TaskScope.SHARED,
+            "export_csv",
+            "large_export",
+            999,  # user_id should be ignored for shared
+            "shared|export_csv|large_export",
+        ),
+        # Shared tasks with string scope
+        (
+            "shared",
+            "batch_process",
+            "batch_001",
+            123,  # user_id should be ignored for shared
+            "shared|batch_process|batch_001",
+        ),
+        # System tasks with TaskScope enum
+        (
+            TaskScope.SYSTEM,
+            "cleanup_task",
+            "daily_cleanup",
+            None,
+            "system|cleanup_task|daily_cleanup",
+        ),
+        (
+            TaskScope.SYSTEM,
+            "db_migration",
+            "version_123",
+            1,  # user_id should be ignored for system
+            "system|db_migration|version_123",
+        ),
+        # System tasks with string scope
+        (
+            "system",
+            "maintenance",
+            "nightly_job",
+            2,  # user_id should be ignored for system
+            "system|maintenance|nightly_job",
+        ),
+    ],
+)
+def test_get_active_dedup_key(
+    scope, task_type, task_key, user_id, expected_composite_key, mocker, 
app_context
+):
+    """Test get_active_dedup_key generates a hash of the composite key.
+
+    The function hashes the composite key using the configured HASH_ALGORITHM
+    to produce a fixed-length dedup_key for database storage. The result is
+    truncated to 64 chars to fit the database column.
+    """
+    # Mock get_user_id to return the specified user_id
+    mocker.patch("superset.utils.core.get_user_id", return_value=user_id)
+
+    result = get_active_dedup_key(scope, task_type, task_key)
+
+    # The result should be a hash of the expected composite key, truncated to 
64 chars
+    expected_hash = hash_from_str(expected_composite_key)[:64]
+    assert result == expected_hash
+    assert len(result) <= 64
+
+
+def test_get_active_dedup_key_private_requires_user_id(mocker):
+    """Test that private tasks require user_id from get_user_id()"""
+    # Mock get_user_id to return None
+    mocker.patch("superset.utils.core.get_user_id", return_value=None)
+

Review Comment:
   **Suggestion:** Similarly, this test for the error path of private tasks 
patches `get_user_id` on `superset.utils.core`, which will not intercept calls 
if `superset.tasks.utils.get_active_dedup_key` imported `get_user_id` directly; 
in that case `get_active_dedup_key` will see a real user ID and not raise the 
expected `ValueError`, causing the test to give false negatives. [possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ This test may silently pass or fail incorrectly in CI.
   - ⚠️ Affects dedup key validation negative-path test.
   ```
   </details>
   
   ```suggestion
       mocker.patch("superset.tasks.utils.get_user_id", return_value=None)
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Open tests/unit_tests/tasks/test_utils.py at the test function
   test_get_active_dedup_key_private_requires_user_id (starts at line ~440). 
The mock is set
   at lines 442-443.
   
   2. Check superset/tasks/utils.py to see whether get_active_dedup_key calls 
get_user_id via
   a local import (e.g., `from superset.utils.core import get_user_id`) or by 
referencing
   superset.utils.core.get_user_id at call-time.
   
   3. Run pytest for the failing test: pytest
   
tests/unit_tests/tasks/test_utils.py::test_get_active_dedup_key_private_requires_user_id.
   The test patches superset.utils.core.get_user_id but not 
superset.tasks.utils.get_user_id.
   
   4. If tasks.utils bound get_user_id at import time, the test will not 
observe the patched
   None and get_active_dedup_key will not raise ValueError at line where it 
validates private
   tasks — reproducing the false-negative test behavior. If tasks.utils calls 
the core
   function dynamically, the current patch succeeds.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/tasks/test_utils.py
   **Line:** 443:443
   **Comment:**
        *Possible Bug: Similarly, this test for the error path of private tasks 
patches `get_user_id` on `superset.utils.core`, which will not intercept calls 
if `superset.tasks.utils.get_active_dedup_key` imported `get_user_id` directly; 
in that case `get_active_dedup_key` will see a real user ID and not raise the 
expected `ValueError`, causing the test to give false negatives.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/models/tasks.py:
##########
@@ -0,0 +1,370 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task model for Global Task Framework (GTF)"""
+
+from __future__ import annotations
+
+import uuid
+from datetime import datetime, timezone
+from typing import Any, cast
+
+from flask_appbuilder import Model
+from sqlalchemy import (
+    Column,
+    DateTime,
+    Integer,
+    String,
+    Text,
+)
+from sqlalchemy.orm import relationship
+from superset_core.api.models import Task as CoreTask
+from superset_core.api.tasks import TaskProperties, TaskStatus
+
+from superset.models.helpers import AuditMixinNullable
+from superset.models.task_subscribers import TaskSubscriber
+from superset.tasks.utils import (
+    error_update,
+    get_finished_dedup_key,
+    parse_properties,
+    serialize_properties,
+)
+from superset.utils import json
+
+
+class Task(CoreTask, AuditMixinNullable, Model):
+    """
+    Concrete Task model for the Global Task Framework (GTF).
+
+    This model represents async tasks in Superset, providing unified tracking
+    for all background operations including SQL queries, thumbnail generation,
+    reports, and other async operations.
+
+    Non-filterable fields (progress, error info, execution config) are stored
+    in a `properties` JSON blob for schema flexibility.
+    """
+
+    __tablename__ = "tasks"
+
+    # Primary key and identifiers
+    id = Column(Integer, primary_key=True)
+    uuid = Column(
+        String(36), nullable=False, unique=True, default=lambda: 
str(uuid.uuid4())
+    )
+
+    # Task metadata (filterable)
+    task_key = Column(String(256), nullable=False, index=True)  # For 
deduplication
+    task_type = Column(String(100), nullable=False, index=True)  # e.g., 
'sql_execution'
+    task_name = Column(String(256), nullable=True)  # Human readable name
+    scope = Column(
+        String(20), nullable=False, index=True, default="private"
+    )  # private/shared/system
+    status = Column(
+        String(50), nullable=False, index=True, 
default=TaskStatus.PENDING.value
+    )
+    dedup_key = Column(
+        String(64), nullable=False, unique=True, index=True
+    )  # Hashed deduplication key (SHA-256 = 64 chars, UUID = 36 chars)
+
+    # Timestamps
+    started_at = Column(DateTime, nullable=True)
+    ended_at = Column(DateTime, nullable=True)
+
+    # User context for execution
+    user_id = Column(Integer, nullable=True)
+
+    # Task-specific output data (set by task code via 
ctx.update_task(payload=...))
+    payload = Column(Text, nullable=True, default="{}")
+
+    # Properties JSON blob - contains runtime state and execution config:
+    # - is_abortable: bool - has abort handler registered
+    # - progress_percent: float - progress 0.0-1.0
+    # - progress_current: int - current iteration count
+    # - progress_total: int - total iterations
+    # - error_message: str - human-readable error message
+    # - exception_type: str - exception class name
+    # - stack_trace: str - full formatted traceback
+    # - timeout: int - timeout in seconds
+    _properties = Column("properties", Text, nullable=True, default="{}")
+
+    # Transient cache for parsed properties (not persisted)
+    _properties_cache: TaskProperties | None = None
+
+    # Relationships
+    subscribers = relationship(
+        TaskSubscriber,
+        back_populates="task",
+        cascade="all, delete-orphan",
+    )
+
+    def __repr__(self) -> str:
+        return f"<Task {self.task_type}:{self.task_key} [{self.status}]>"
+
+    # -------------------------------------------------------------------------
+    # Properties accessor
+    # -------------------------------------------------------------------------
+
+    @property
+    def properties(self) -> TaskProperties:
+        """
+        Get typed properties (cached for performance).
+
+        Properties contain runtime state and execution config that doesn't
+        need database filtering. Parsed once and cached until next write.
+
+        Always use .get() for reads since keys may be absent.
+
+        :returns: TaskProperties dict (sparse - only contains keys that were 
set)
+        """
+        if self._properties_cache is None:
+            self._properties_cache = parse_properties(self._properties)
+        return self._properties_cache
+
+    def update_properties(self, updates: TaskProperties) -> None:
+        """
+        Update specific properties fields (merge semantics).
+
+        Only updates fields present in the updates dict.
+
+        :param updates: TaskProperties dict with fields to update
+
+        Example:
+            task.update_properties({"is_abortable": True})
+            task.update_properties(progress_update((50, 100)))
+        """
+        current = cast(TaskProperties, dict(self.properties))
+        current.update(updates)  # Merge updates
+        self._properties = serialize_properties(current)
+        self._properties_cache = current  # Update cache
+
+    # -------------------------------------------------------------------------
+    # Payload accessor (for task-specific output data)
+    # -------------------------------------------------------------------------
+
+    def get_payload(self) -> dict[str, Any]:
+        """
+        Get payload as parsed JSON.
+
+        Payload contains task-specific output data set by task code via
+        ctx.update_task(payload=...).
+
+        :returns: Dictionary containing payload data
+        """
+        try:
+            return json.loads(self.payload or "{}")
+        except (json.JSONDecodeError, TypeError):
+            return {}
+
+    def set_payload(self, data: dict[str, Any]) -> None:
+        """
+        Update payload with new data.
+
+        The payload is merged with existing data, not replaced.
+
+        :param data: Dictionary of data to merge into payload
+        """
+        current = self.get_payload()
+        current.update(data)
+        self.payload = json.dumps(current)
+
+    # -------------------------------------------------------------------------
+    # Error handling
+    # -------------------------------------------------------------------------
+
+    def set_error_from_exception(self, exception: BaseException) -> None:
+        """
+        Set error fields from an exception.
+
+        Captures the error message, exception type, and full stack trace.
+        Called automatically by the executor when a task raises an exception.
+
+        :param exception: The exception that caused the failure
+        """
+        self.update_properties(error_update(exception))
+
+    # -------------------------------------------------------------------------
+    # Status management
+    # -------------------------------------------------------------------------
+
+    def set_status(self, status: TaskStatus | str) -> None:
+        """
+        Update task status and dedup_key.
+
+        When a task finishes (success, failure, or abort), the dedup_key is
+        changed to the task's UUID. This frees up the slot so new tasks with
+        the same parameters can be created.
+
+        :param status: New task status
+        """
+        if isinstance(status, TaskStatus):
+            status = status.value
+        self.status = status
+
+        # Update timestamps and is_abortable based on status
+        now = datetime.now(timezone.utc)
+        if status == TaskStatus.IN_PROGRESS.value and not self.started_at:
+            self.started_at = now
+            # Set is_abortable to False when task starts executing
+            # (will be set to True if/when an abort handler is registered)
+            if self.properties.get("is_abortable") is None:
+                self.update_properties({"is_abortable": False})
+        elif status in [
+            TaskStatus.SUCCESS.value,
+            TaskStatus.FAILURE.value,
+            TaskStatus.ABORTED.value,
+            TaskStatus.TIMED_OUT.value,
+        ]:
+            if not self.ended_at:
+                self.ended_at = now
+            # Update dedup_key to UUID to free up the slot for new tasks
+            self.dedup_key = get_finished_dedup_key(self.uuid)
+        # Note: ABORTING status doesn't set ended_at yet - that happens when
+        # the task transitions to ABORTED after handlers complete
+
+    @property
+    def is_pending(self) -> bool:
+        """Check if task is pending."""
+        return self.status == TaskStatus.PENDING.value
+
+    @property
+    def is_running(self) -> bool:
+        """Check if task is currently running."""
+        return self.status == TaskStatus.IN_PROGRESS.value
+
+    @property
+    def is_finished(self) -> bool:
+        """Check if task has finished (success, failure, aborted, or timed 
out)."""
+        return self.status in [
+            TaskStatus.SUCCESS.value,
+            TaskStatus.FAILURE.value,
+            TaskStatus.ABORTED.value,
+            TaskStatus.TIMED_OUT.value,
+        ]
+
+    @property
+    def is_successful(self) -> bool:
+        """Check if task completed successfully."""
+        return self.status == TaskStatus.SUCCESS.value
+
+    @property
+    def duration_seconds(self) -> float | None:
+        """
+        Get task duration in seconds.
+
+        - Finished tasks: Time from started_at to ended_at (None if never 
started)
+        - Running/aborting tasks: Time from started_at to now
+        - Pending tasks: Time from created_on to now (queue time)
+
+        Note: started_at/ended_at are stored in UTC, but created_on from
+        AuditMixinNullable is stored as naive local time. We handle both cases.
+        """
+        if self.is_finished:
+            # Task has completed - use fixed timestamps, never increment
+            if self.started_at and self.ended_at:
+                # Finished task - both timestamps use the same timezone (UTC)
+                # Just compute the difference directly
+                return (self.ended_at - self.started_at).total_seconds()
+            # Never started (e.g., aborted while pending) - no duration
+            return None
+        elif self.started_at:
+            # Running or aborting - started_at is UTC (set by set_status)
+            # Use UTC now for comparison
+            now = datetime.now(timezone.utc)
+            started = (
+                self.started_at.replace(tzinfo=timezone.utc)
+                if self.started_at.tzinfo is None
+                else self.started_at
+            )
+            return (now - started).total_seconds()
+        elif self.created_on:
+            # Pending - created_on is naive LOCAL time (from 
AuditMixinNullable)
+            # Use naive local time for comparison
+            now = datetime.now()  # Local time, no timezone
+            created = (
+                self.created_on.replace(tzinfo=None)
+                if self.created_on.tzinfo is not None
+                else self.created_on
+            )
+            return (now - created).total_seconds()
+        return None
+
+    # Scope-related properties
+    @property
+    def is_private(self) -> bool:
+        """Check if task is private (user-specific)."""
+        return self.scope == "private"
+
+    @property
+    def is_shared(self) -> bool:
+        """Check if task is shared (multi-user)."""
+        return self.scope == "shared"
+
+    @property
+    def is_system(self) -> bool:
+        """Check if task is system (admin-only)."""
+        return self.scope == "system"
+
+    # Subscriber-related methods
+    @property
+    def subscriber_count(self) -> int:
+        """Get number of subscribers to this task."""
+        return len(self.subscribers)
+
+    def has_subscriber(self, user_id: int) -> bool:
+        """
+        Check if a user is subscribed to this task.
+
+        :param user_id: User ID to check
+        :returns: True if user is subscribed
+        """
+        return any(sub.user_id == user_id for sub in self.subscribers)
+
+    def get_subscriber_ids(self) -> list[int]:
+        """
+        Get list of all subscriber user IDs.
+
+        :returns: List of user IDs subscribed to this task
+        """
+        return [sub.user_id for sub in self.subscribers]
+
+    def to_dict(self) -> dict[str, Any]:
+        """
+        Convert task to dictionary representation.
+
+        Minimal API payload - frontend derives status booleans and abort logic
+        from status and properties.is_abortable.
+
+        :returns: Dictionary representation of the task
+        """
+        return {
+            "id": self.id,
+            "uuid": self.uuid,
+            "task_key": self.task_key,
+            "task_type": self.task_type,
+            "task_name": self.task_name,
+            "scope": self.scope,
+            "status": self.status,
+            "created_on": self.created_on.isoformat() if self.created_on else 
None,
+            "changed_on": self.changed_on.isoformat() if self.changed_on else 
None,
+            "started_at": self.started_at.isoformat() if self.started_at else 
None,
+            "ended_at": self.ended_at.isoformat() if self.ended_at else None,
+            "created_by_fk": self.created_by_fk,
+            "user_id": self.user_id,
+            "payload": self.get_payload(),
+            "properties": self.properties,  # Already a dict

Review Comment:
   **Suggestion:** `set_error_from_exception` stores a full `stack_trace` in 
`properties`, and `to_dict` returns `self.properties` verbatim, which means raw 
stack traces (including file paths and possibly sensitive information) are 
exposed to API consumers, representing an information disclosure vulnerability; 
the API should strip stack traces from the serialized payload. [security]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ API payload exposes internal stack traces.
   - ⚠️ Error reporting leaks file paths and internals.
   - ❌ Potential information disclosure to clients.
   ```
   </details>
   
   ```suggestion
               "properties": {
                   key: value
                   for key, value in self.properties.items()
                   if key != "stack_trace"
               },
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Cause a task to raise an exception; the executor (per the docstring in
   superset/models/tasks.py) will call set_error_from_exception(exception)
   (superset/models/tasks.py:186-197).
   
   2. set_error_from_exception calls update_properties(...) 
(superset/models/tasks.py:196),
   which uses error_update(exception) from superset/tasks/utils.py:275-286. 
error_update
   populates "stack_trace" via traceback.format_exc().
   
   3. After the error, call task.to_dict() (superset/models/tasks.py:343-370). 
The
   implementation returns "properties": self.properties verbatim
   (superset/models/tasks.py:366-368), which includes the "stack_trace" key and 
full
   traceback.
   
   4. Observation: the returned dict (intended as a minimal API payload per the 
to_dict()
   docstring) contains raw stack traces and formatted tracebacks, exposing 
internal file
   paths and implementation details to API consumers/front-end.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/models/tasks.py
   **Line:** 367:367
   **Comment:**
        *Security: `set_error_from_exception` stores a full `stack_trace` in 
`properties`, and `to_dict` returns `self.properties` verbatim, which means raw 
stack traces (including file paths and possibly sensitive information) are 
exposed to API consumers, representing an information disclosure vulnerability; 
the API should strip stack traces from the serialized payload.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/tasks/utils.py:
##########
@@ -142,3 +151,166 @@ def fetch_csrf_token(
 
     logger.error("Error fetching CSRF token, status code: %s", response.status)
     return {}
+
+
+def generate_random_task_key() -> str:
+    """
+    Generate a random task key.
+
+    This is the default behavior - each task submission gets a unique UUID
+    unless an explicit task_key is provided in TaskOptions.
+
+    :returns: A random UUID string
+    """
+    return str(uuid4())
+
+
+def get_active_dedup_key(
+    scope: TaskScope | str,
+    task_type: str,
+    task_key: str,
+    user_id: int | None = None,
+) -> str:
+    """
+    Build a deduplication key for active tasks.
+
+    The dedup_key enforces uniqueness at the database level via a unique index.
+    Active tasks use a composite key based on scope, which is then hashed using
+    the configured HASH_ALGORITHM to produce a fixed-length key.
+
+    The composite key format before hashing is:
+    - Private: private|task_type|task_key|user_id
+    - Shared: shared|task_type|task_key
+    - System: system|task_type|task_key
+
+    The final key is a hash digest (64 chars for sha256, 32 chars for md5).
+
+    :param scope: Task scope (PRIVATE/SHARED/SYSTEM) as TaskScope enum or 
string
+    :param task_type: Type of task (e.g., 'sql_execution')
+    :param task_key: Task identifier for deduplication
+    :param user_id: User ID for private tasks (falls back to g.user if not 
provided)
+    :returns: Hashed deduplication key string
+    :raises ValueError: If user_id is missing for private scope
+    """
+    # Convert string to TaskScope if needed
+    if isinstance(scope, str):
+        scope = TaskScope(scope)
+
+    # Build composite key
+    match scope:
+        case TaskScope.PRIVATE:
+            # Use provided user_id, or fall back to current user from Flask 
context
+            effective_user_id = user_id
+            if effective_user_id is None:
+                from superset.utils.core import get_user_id
+
+                effective_user_id = get_user_id()
+            if effective_user_id is None:
+                raise ValueError("user_id required for private tasks")
+            composite_key = 
f"{scope.value}|{task_type}|{task_key}|{effective_user_id}"
+        case TaskScope.SHARED:
+            composite_key = f"{scope.value}|{task_type}|{task_key}"
+        case TaskScope.SYSTEM:
+            composite_key = f"{scope.value}|{task_type}|{task_key}"
+        case _:
+            raise ValueError(f"Invalid scope: {scope}")
+
+    # Hash the composite key to produce a fixed-length dedup_key
+    # Truncate to 64 chars max to fit the database column in case
+    # a hash algo is used that generates hashes that exceed 64 chars
+    return hash_from_str(composite_key)[:64]
+
+
+def get_finished_dedup_key(task_uuid: str) -> str:
+    """
+    Build a deduplication key for finished tasks.
+
+    When a task completes (success, failure, or abort), its dedup_key is
+    changed to its UUID. This frees up the slot so new tasks with the same
+    parameters can be created.
+
+    :param task_uuid: Task UUID
+    :returns: The task UUID as the dedup key
+
+    Example:
+        >>> get_finished_dedup_key("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
+        'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
+    """
+    return task_uuid
+
+
+# -----------------------------------------------------------------------------
+# TaskProperties helper functions
+# -----------------------------------------------------------------------------
+
+
+def progress_update(progress: float | int | tuple[int, int]) -> TaskProperties:
+    """
+    Create a properties update dict for progress values.
+
+    :param progress: One of:
+        - float (0.0-1.0): Percentage only
+        - int: Count only (total unknown)
+        - tuple[int, int]: (current, total) with auto-computed percentage
+    :returns: TaskProperties dict with appropriate progress fields set
+
+    Example:
+        task.update_properties(progress_update((50, 100)))
+    """
+    if isinstance(progress, float):
+        return {"progress_percent": progress}
+    if isinstance(progress, int):
+        return {"progress_current": progress}
+    # tuple
+    current, total = progress
+    result: TaskProperties = {
+        "progress_current": current,
+        "progress_total": total,
+    }
+    if total > 0:
+        result["progress_percent"] = current / total
+    return result
+
+
+def error_update(exception: BaseException) -> TaskProperties:
+    """
+    Create a properties update dict from an exception.
+
+    :param exception: The exception that caused the failure
+    :returns: TaskProperties dict with error fields populated
+    """
+    return {
+        "error_message": str(exception),
+        "exception_type": type(exception).__name__,
+        "stack_trace": traceback.format_exc(),
+    }
+
+
+def parse_properties(json_str: str | None) -> TaskProperties:
+    """
+    Parse JSON string into TaskProperties dict.
+
+    Returns empty dict on parse errors. Unknown keys are preserved
+    for forward compatibility (allows adding new properties without
+    breaking existing code).
+
+    :param json_str: JSON string or None
+    :returns: TaskProperties dict (sparse - only contains keys that were set)
+    """
+    if not json_str:
+        return {}
+
+    try:
+        return json.loads(json_str)
+    except (json.JSONDecodeError, TypeError):
+        return {}

Review Comment:
   **Suggestion:** The JSON parsing helper claims to return a `TaskProperties` 
dict but directly returns the result of `json.loads`, which can be `None` or a 
list (for inputs like `"null"` or `"[]"`), leading to type mismatches and 
potential runtime errors when callers assume a dict. [type error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Task properties parsing yields non-dict breaking callers.
   - ⚠️ Task UI code expecting dict may raise TypeError.
   - ⚠️ Data-reading code paths may mis-handle stored JSON.
   ```
   </details>
   
   ```suggestion
           value = json.loads(json_str)
       except (json.JSONDecodeError, TypeError):
           return {}
   
       return value if isinstance(value, dict) else {}
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. In a Python REPL or unit test, import the helper at 
`superset/tasks/utils.py:289`:
   
      from superset.tasks.utils import parse_properties
   
   2. Call the function with JSON that is valid but not an object, e.g. the 
JSON literal
   `null`:
   
      result = parse_properties("null")
   
      (This executes `superset/tasks/utils.py:289-306`.)
   
   3. Observe `result` is None (or a non-dict) because `json.loads("null")` 
returns None,
   violating the expected TaskProperties dict contract.
   
   4. Similarly, call with a JSON array: result = parse_properties("[]") and 
observe a list
   returned, which will break callers that assume dict-like access (e.g., 
calling .get()).
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/tasks/utils.py
   **Line:** 304:306
   **Comment:**
        *Type Error: The JSON parsing helper claims to return a 
`TaskProperties` dict but directly returns the result of `json.loads`, which 
can be `None` or a list (for inputs like `"null"` or `"[]"`), leading to type 
mismatches and potential runtime errors when callers assume a dict.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/migrations/versions/2025_12_18_0220_create_tasks_table.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Create tasks and task_subscriber tables for Global Task Framework (GTF)
+
+Revision ID: 4b2a8c9d3e1f
+Revises: 9787190b3d89
+Create Date: 2025-12-18 02:20:00.000000
+
+"""
+
+from sqlalchemy import (
+    Column,
+    DateTime,
+    Integer,
+    String,
+    Text,
+    UniqueConstraint,
+)
+
+from superset.migrations.shared.utils import (
+    create_fks_for_table,
+    create_index,
+    create_table,
+    drop_fks_for_table,
+    drop_index,
+    drop_table,
+)
+
+# revision identifiers, used by Alembic.
+revision = "4b2a8c9d3e1f"
+down_revision = "9787190b3d89"
+
+TASKS_TABLE = "tasks"
+TASK_SUBSCRIBERS_TABLE = "task_subscribers"
+
+
+def upgrade():
+    """
+    Create tasks and task_subscribers tables for the Global Task Framework 
(GTF).
+
+    This migration creates:
+    1. tasks table - unified tracking for all long running tasks
+    2. task_subscribers table - multi-user task subscriptions for shared tasks
+
+    The scope feature allows tasks to be:
+    - private: user-specific (default)
+    - shared: multi-user collaborative tasks
+    - system: admin-only background tasks
+    """
+    # Create tasks table
+    create_table(
+        TASKS_TABLE,
+        Column("id", Integer, primary_key=True),
+        Column("uuid", String(36), nullable=False, unique=True),
+        Column("task_key", String(256), nullable=False),
+        Column("task_type", String(100), nullable=False),
+        Column("task_name", String(256), nullable=True),
+        Column("scope", String(20), nullable=False, server_default="private"),
+        Column("status", String(50), nullable=False),
+        Column("dedup_key", String(64), nullable=False),
+        # AuditMixinNullable columns
+        Column("created_on", DateTime, nullable=True),
+        Column("changed_on", DateTime, nullable=True),
+        Column("created_by_fk", Integer, nullable=True),
+        Column("changed_by_fk", Integer, nullable=True),
+        # Task-specific columns
+        Column("started_at", DateTime, nullable=True),
+        Column("ended_at", DateTime, nullable=True),
+        Column("user_id", Integer, nullable=True),
+        Column("payload", Text, nullable=True),
+        Column("properties", Text, nullable=True),
+    )
+
+    # Create indexes for optimal query performance
+    create_index(TASKS_TABLE, "idx_tasks_dedup_key", ["dedup_key"], 
unique=True)
+    create_index(TASKS_TABLE, "idx_tasks_status", ["status"])
+    create_index(TASKS_TABLE, "idx_tasks_scope", ["scope"])
+    create_index(TASKS_TABLE, "idx_tasks_ended_at", ["ended_at"])
+    create_index(TASKS_TABLE, "idx_tasks_created_by", ["created_by_fk"])
+    create_index(TASKS_TABLE, "idx_tasks_created_on", ["created_on"])
+    create_index(TASKS_TABLE, "idx_tasks_task_type", ["task_type"])
+    create_index(TASKS_TABLE, "idx_tasks_uuid", ["uuid"], unique=True)
+
+    # Create foreign key constraints for tasks
+    create_fks_for_table(
+        foreign_key_name="fk_tasks_created_by_fk_ab_user",
+        table_name=TASKS_TABLE,
+        referenced_table="ab_user",
+        local_cols=["created_by_fk"],
+        remote_cols=["id"],
+        ondelete="SET NULL",
+    )
+
+    create_fks_for_table(
+        foreign_key_name="fk_tasks_changed_by_fk_ab_user",
+        table_name=TASKS_TABLE,
+        referenced_table="ab_user",
+        local_cols=["changed_by_fk"],
+        remote_cols=["id"],
+        ondelete="SET NULL",
+    )
+

Review Comment:
   **Suggestion:** The `user_id` column on the tasks table is defined as an 
integer without a foreign key constraint, unlike other user-related columns, 
which can lead to orphaned task records when users are deleted or IDs change 
and makes data integrity inconsistent with `task_subscribers.user_id`. [logic 
error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Tasks retain deleted user IDs (orphaned references).
   - ⚠️ Inconsistent integrity vs task_subscribers.user_id FK.
   - ⚠️ Reporting or joins may show stale task ownership.
   ```
   </details>
   
   ```suggestion
   
       create_fks_for_table(
           foreign_key_name="fk_tasks_user_id_ab_user",
           table_name=TASKS_TABLE,
           referenced_table="ab_user",
           local_cols=["user_id"],
           remote_cols=["id"],
           ondelete="SET NULL",
       )
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Apply the migration by running Alembic upgrade that executes upgrade() in 
this file
   
(`superset/migrations/versions/2025_12_18_0220_create_tasks_table.py:51`-`86`). 
The tasks
   table is created with Column("user_id", Integer, nullable=True) at line `83`.
   
   2. Insert a task row referencing an existing user id: run SQL INSERT INTO 
tasks (uuid,
   task_key, task_type, status, dedup_key, user_id) VALUES (..., user_id=42) 
against the
   database. This exercise touches the table schema created at 
`create_table(...)` in the
   same file (`lines ~65-86`).
   
   3. Delete that user from the users table (ab_user): e.g. DELETE FROM ab_user 
WHERE id =
   42. Because there is no foreign key on tasks.user_id, the tasks row is not 
updated or
   removed.
   
   4. Observe an orphaned tasks.user_id value (stale reference) when querying 
tasks (SELECT
   user_id FROM tasks WHERE uuid = ...). Note: other user-related columns
   (created_by_fk/changed_by_fk) have FKs created via create_fks_for_table 
(lines `99`-`115`)
   with ondelete="SET NULL", demonstrating inconsistent constraints vs user_id.
   
   Explanation: The current pattern intentionally creates FKs for 
created_by_fk/changed_by_fk
   (see `create_fks_for_table` calls at lines `99`-`115`) but omits a FK for 
`user_id`, so
   deleting users leaves orphaned user_id values. This reproduces 
deterministically using the
   migration-created schema and basic SQL operations.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/migrations/versions/2025_12_18_0220_create_tasks_table.py
   **Line:** 116:116
   **Comment:**
        *Logic Error: The `user_id` column on the tasks table is defined as an 
integer without a foreign key constraint, unlike other user-related columns, 
which can lead to orphaned task records when users are deleted or IDs change 
and makes data integrity inconsistent with `task_subscribers.user_id`.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/models/tasks.py:
##########
@@ -0,0 +1,370 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task model for Global Task Framework (GTF)"""
+
+from __future__ import annotations
+
+import uuid
+from datetime import datetime, timezone
+from typing import Any, cast
+
+from flask_appbuilder import Model
+from sqlalchemy import (
+    Column,
+    DateTime,
+    Integer,
+    String,
+    Text,
+)
+from sqlalchemy.orm import relationship
+from superset_core.api.models import Task as CoreTask
+from superset_core.api.tasks import TaskProperties, TaskStatus
+
+from superset.models.helpers import AuditMixinNullable
+from superset.models.task_subscribers import TaskSubscriber
+from superset.tasks.utils import (
+    error_update,
+    get_finished_dedup_key,
+    parse_properties,
+    serialize_properties,
+)
+from superset.utils import json
+
+
+class Task(CoreTask, AuditMixinNullable, Model):
+    """
+    Concrete Task model for the Global Task Framework (GTF).
+
+    This model represents async tasks in Superset, providing unified tracking
+    for all background operations including SQL queries, thumbnail generation,
+    reports, and other async operations.
+
+    Non-filterable fields (progress, error info, execution config) are stored
+    in a `properties` JSON blob for schema flexibility.
+    """
+
+    __tablename__ = "tasks"
+
+    # Primary key and identifiers
+    id = Column(Integer, primary_key=True)
+    uuid = Column(
+        String(36), nullable=False, unique=True, default=lambda: 
str(uuid.uuid4())
+    )
+
+    # Task metadata (filterable)
+    task_key = Column(String(256), nullable=False, index=True)  # For 
deduplication
+    task_type = Column(String(100), nullable=False, index=True)  # e.g., 
'sql_execution'
+    task_name = Column(String(256), nullable=True)  # Human readable name
+    scope = Column(
+        String(20), nullable=False, index=True, default="private"
+    )  # private/shared/system
+    status = Column(
+        String(50), nullable=False, index=True, 
default=TaskStatus.PENDING.value
+    )
+    dedup_key = Column(
+        String(64), nullable=False, unique=True, index=True
+    )  # Hashed deduplication key (SHA-256 = 64 chars, UUID = 36 chars)
+
+    # Timestamps
+    started_at = Column(DateTime, nullable=True)
+    ended_at = Column(DateTime, nullable=True)
+
+    # User context for execution
+    user_id = Column(Integer, nullable=True)
+
+    # Task-specific output data (set by task code via 
ctx.update_task(payload=...))
+    payload = Column(Text, nullable=True, default="{}")
+
+    # Properties JSON blob - contains runtime state and execution config:
+    # - is_abortable: bool - has abort handler registered
+    # - progress_percent: float - progress 0.0-1.0
+    # - progress_current: int - current iteration count
+    # - progress_total: int - total iterations
+    # - error_message: str - human-readable error message
+    # - exception_type: str - exception class name
+    # - stack_trace: str - full formatted traceback
+    # - timeout: int - timeout in seconds
+    _properties = Column("properties", Text, nullable=True, default="{}")
+
+    # Transient cache for parsed properties (not persisted)
+    _properties_cache: TaskProperties | None = None
+
+    # Relationships
+    subscribers = relationship(
+        TaskSubscriber,
+        back_populates="task",
+        cascade="all, delete-orphan",
+    )
+
+    def __repr__(self) -> str:
+        return f"<Task {self.task_type}:{self.task_key} [{self.status}]>"
+
+    # -------------------------------------------------------------------------
+    # Properties accessor
+    # -------------------------------------------------------------------------
+
+    @property
+    def properties(self) -> TaskProperties:
+        """
+        Get typed properties (cached for performance).
+
+        Properties contain runtime state and execution config that doesn't
+        need database filtering. Parsed once and cached until next write.
+
+        Always use .get() for reads since keys may be absent.
+
+        :returns: TaskProperties dict (sparse - only contains keys that were 
set)
+        """
+        if self._properties_cache is None:
+            self._properties_cache = parse_properties(self._properties)
+        return self._properties_cache

Review Comment:
   **Suggestion:** The `properties` accessor returns the internal cached 
dictionary directly, so any caller that mutates the returned dict will update 
the in-memory cache but not the serialized `_properties` column, leading to 
confusing, inconsistent state where changes appear to "stick" on the object but 
are never persisted to the database unless `update_properties` is explicitly 
called. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ⚠️ In-memory Task properties differ from persisted DB column.
   - ⚠️ Task updater code may rely on persisted properties.
   - ⚠️ Tests can observe flaky state when mutating properties.
   ```
   </details>
   
   ```suggestion
           # Return a copy so external mutation doesn't desynchronize cache and 
DB value
           return cast(TaskProperties, dict(self._properties_cache))
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. In a Python REPL or unit test import Task from superset.models.tasks and 
create/persist
   a Task instance (e.g., instantiate Task(...) and commit via SQLAlchemy). The 
properties
   accessor implementation is at superset/models/tasks.py:120-134.
   
   2. Load the persisted Task (SQLAlchemy query) and call the accessor at
   superset/models/tasks.py:121-134:
   
      props = task.properties # returns the internal cached dict (line 133-134)
   
   3. Mutate the returned dict in place:
   
      props["new_key"] = "value"
   
   4. Commit the SQLAlchemy session and inspect task._properties (the DB-backed 
column).
   Because only update_properties (superset/models/tasks.py:136-151) calls
   serialize_properties (superset/tasks/utils.py:309-316) to write to 
_properties, the
   in-memory mutation will not be serialized or persisted. The object will show 
the change in
   memory but the database column (and future new object loads) will not 
reflect it.
   
   5. Observation: direct mutation appears to "stick" on the in-memory Task 
instance, but is
   not persisted unless update_properties(...) 
(superset/models/tasks.py:136-151) or an
   explicit write path is used.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/models/tasks.py
   **Line:** 134:134
   **Comment:**
        *Logic Error: The `properties` accessor returns the internal cached 
dictionary directly, so any caller that mutates the returned dict will update 
the in-memory cache but not the serialized `_properties` column, leading to 
confusing, inconsistent state where changes appear to "stick" on the object but 
are never persisted to the database unless `update_properties` is explicitly 
called.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/integration_tests/tasks/api_tests.py:
##########
@@ -0,0 +1,611 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Integration tests for Task REST API"""
+
+from contextlib import contextmanager
+from typing import Generator
+
+import prison
+from superset_core.api.tasks import TaskStatus
+
+from superset import db
+from superset.models.tasks import Task
+from superset.utils import json
+from tests.integration_tests.base_tests import SupersetTestCase
+from tests.integration_tests.constants import (
+    ADMIN_USERNAME,
+    GAMMA_USERNAME,
+)
+
+
+class TestTaskApi(SupersetTestCase):
+    """Tests for Task REST API"""
+
+    TASK_API_BASE = "api/v1/task"

Review Comment:
   **Suggestion:** The tests call `self.get_user(...)` assuming it is provided 
by `SupersetTestCase`, but `get_user` is actually a module-level helper in 
`tests.integration_tests.base_tests`, so at runtime `TestTaskApi` instances 
will raise `AttributeError` when trying to access `self.get_user`; you can fix 
this by importing the helper and adding a wrapper method on the test class. 
[possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Integration tests tasks API fail with AttributeError.
   - ⚠️ CI job running integration tests will be blocked.
   - ⚠️ Test coverage for Global Task Framework missing.
   ```
   </details>
   
   ```suggestion
   from tests.integration_tests.base_tests import SupersetTestCase, get_user as 
base_get_user
   from tests.integration_tests.constants import (
       ADMIN_USERNAME,
       GAMMA_USERNAME,
   )
   
   
   class TestTaskApi(SupersetTestCase):
       """Tests for Task REST API"""
   
       TASK_API_BASE = "api/v1/task"
   
       def get_user(self, username: str):
           """
           Convenience wrapper to use the shared get_user helper in tests.
           """
           return base_get_user(username)
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Run any test in this file, e.g. `pytest
   
tests/integration_tests/tasks/api_tests.py::TestTaskApi::test_get_task_by_id`.
   
   2. Test execution enters TestTaskApi._create_tasks
   (tests/integration_tests/tasks/api_tests.py:41) and executes the line `admin 
=
   self.get_user("admin")` (tests/integration_tests/tasks/api_tests.py:56).
   
   3. Python looks up `get_user` on the TestTaskApi instance (inherited from
   SupersetTestCase). SupersetTestCase does not define a `get_user` method (see
   tests/integration_tests/base_tests.py); instead a module-level helper 
`get_user(username:
   str)` exists at tests/integration_tests/base_tests.py:256-262.
   
   4. Because `self.get_user` is missing, the call raises AttributeError and 
the test fails
   immediately with a traceback pointing to 
tests/integration_tests/tasks/api_tests.py:56.
   The proposed wrapper (importing base-level get_user and adding an instance 
method)
   prevents this.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/integration_tests/tasks/api_tests.py
   **Line:** 28:38
   **Comment:**
        *Possible Bug: The tests call `self.get_user(...)` assuming it is 
provided by `SupersetTestCase`, but `get_user` is actually a module-level 
helper in `tests.integration_tests.base_tests`, so at runtime `TestTaskApi` 
instances will raise `AttributeError` when trying to access `self.get_user`; 
you can fix this by importing the helper and adding a wrapper method on the 
test class.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/tasks/test_utils.py:
##########
@@ -330,3 +340,248 @@ def test_get_executor(
         )
         assert executor_type == expected_executor_type
         assert executor == expected_executor
+
+
[email protected](
+    "scope,task_type,task_key,user_id,expected_composite_key",
+    [
+        # Private tasks with TaskScope enum
+        (
+            TaskScope.PRIVATE,
+            "sql_execution",
+            "chart_123",
+            42,
+            "private|sql_execution|chart_123|42",
+        ),
+        (
+            TaskScope.PRIVATE,
+            "thumbnail_gen",
+            "dash_456",
+            100,
+            "private|thumbnail_gen|dash_456|100",
+        ),
+        # Private tasks with string scope
+        (
+            "private",
+            "api_call",
+            "endpoint_789",
+            200,
+            "private|api_call|endpoint_789|200",
+        ),
+        # Shared tasks with TaskScope enum
+        (
+            TaskScope.SHARED,
+            "report_gen",
+            "monthly_report",
+            None,
+            "shared|report_gen|monthly_report",
+        ),
+        (
+            TaskScope.SHARED,
+            "export_csv",
+            "large_export",
+            999,  # user_id should be ignored for shared
+            "shared|export_csv|large_export",
+        ),
+        # Shared tasks with string scope
+        (
+            "shared",
+            "batch_process",
+            "batch_001",
+            123,  # user_id should be ignored for shared
+            "shared|batch_process|batch_001",
+        ),
+        # System tasks with TaskScope enum
+        (
+            TaskScope.SYSTEM,
+            "cleanup_task",
+            "daily_cleanup",
+            None,
+            "system|cleanup_task|daily_cleanup",
+        ),
+        (
+            TaskScope.SYSTEM,
+            "db_migration",
+            "version_123",
+            1,  # user_id should be ignored for system
+            "system|db_migration|version_123",
+        ),
+        # System tasks with string scope
+        (
+            "system",
+            "maintenance",
+            "nightly_job",
+            2,  # user_id should be ignored for system
+            "system|maintenance|nightly_job",
+        ),
+    ],
+)
+def test_get_active_dedup_key(
+    scope, task_type, task_key, user_id, expected_composite_key, mocker, 
app_context
+):
+    """Test get_active_dedup_key generates a hash of the composite key.
+
+    The function hashes the composite key using the configured HASH_ALGORITHM
+    to produce a fixed-length dedup_key for database storage. The result is
+    truncated to 64 chars to fit the database column.
+    """
+    # Mock get_user_id to return the specified user_id
+    mocker.patch("superset.utils.core.get_user_id", return_value=user_id)

Review Comment:
   **Suggestion:** The test mocks `get_user_id` on the `superset.utils.core` 
module, but the function under test `get_active_dedup_key` is imported from 
`superset.tasks.utils`, so if that module imported `get_user_id` directly (e.g. 
`from superset.utils.core import get_user_id`), this patch will not affect the 
function call and the test will read the real user ID instead of the mocked 
one, making the test brittle or incorrect depending on the environment. 
[possible bug]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ⚠️ Unit test for dedup key may be flaky in CI.
   - ⚠️ tests/unit_tests/tasks/test_utils.py affected (get_active_dedup_key).
   ```
   </details>
   
   ```suggestion
       mocker.patch("superset.tasks.utils.get_user_id", return_value=user_id)
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Open the test at tests/unit_tests/tasks/test_utils.py:419 (definition of
   test_get_active_dedup_key). Note the mock is applied at lines 428-429.
   
   2. Inspect superset/tasks/utils.py to determine how get_user_id is 
referenced. If that
   module did `from superset.utils.core import get_user_id` then calls within 
tasks.utils
   resolve to superset.tasks.utils.get_user_id (check top-of-file imports in
   superset/tasks/utils.py).
   
   3. Run pytest for this single test: pytest
   tests/unit_tests/tasks/test_utils.py::test_get_active_dedup_key - observe 
behavior. The
   test patches superset.utils.core.get_user_id 
(tests/unit_tests/tasks/test_utils.py:429)
   but not superset.tasks.utils.get_user_id.
   
   4. If tasks.utils imported get_user_id into its own namespace, 
get_active_dedup_key will
   call the unpatched local reference and the test may observe the real user id 
(causing
   wrong hash or test failure). If tasks.utils uses 
superset.utils.core.get_user_id at call
   time (no direct import), the existing patch will succeed. This reproduces 
the brittle
   behavior and identifies whether the patch target is incorrect.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/tasks/test_utils.py
   **Line:** 429:429
   **Comment:**
        *Possible Bug: The test mocks `get_user_id` on the 
`superset.utils.core` module, but the function under test 
`get_active_dedup_key` is imported from `superset.tasks.utils`, so if that 
module imported `get_user_id` directly (e.g. `from superset.utils.core import 
get_user_id`), this patch will not affect the function call and the test will 
read the real user ID instead of the mocked one, making the test brittle or 
incorrect depending on the environment.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/daos/tasks.py:
##########
@@ -0,0 +1,309 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task DAO for Global Task Framework (GTF)"""
+
+import logging
+from datetime import datetime, timezone
+from typing import Any
+
+from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus
+
+from superset.daos.base import BaseDAO
+from superset.daos.exceptions import DAODeleteFailedError
+from superset.extensions import db
+from superset.models.task_subscribers import TaskSubscriber
+from superset.models.tasks import Task
+from superset.tasks.constants import ABORTABLE_STATES
+from superset.tasks.filters import TaskFilter
+from superset.tasks.utils import get_active_dedup_key
+
+logger = logging.getLogger(__name__)
+
+
+class TaskDAO(BaseDAO[Task]):
+    """
+    Concrete TaskDAO for the Global Task Framework (GTF).
+
+    Provides database access operations for async tasks including
+    creation, status management, filtering, and subscription management
+    for shared tasks.
+    """
+
+    base_filter = TaskFilter
+
+    @classmethod
+    def find_by_task_key(
+        cls,
+        task_type: str,
+        task_key: str,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+    ) -> Task | None:
+        """
+        Find active task by type, key, scope, and user.
+
+        Uses dedup_key internally for efficient querying with a unique index.
+        Only returns tasks that are active (pending or in progress).
+
+        Uniqueness logic by scope:
+        - private: scope + task_type + task_key + user_id
+        - shared/system: scope + task_type + task_key (user-agnostic)
+
+        :param task_type: Task type to filter by
+        :param task_key: Task identifier for deduplication
+        :param scope: Task scope (private/shared/system)
+        :param user_id: User ID (required for private tasks)
+        :returns: Task instance or None if not found or not active
+        """
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=user_id,
+        )
+
+        # Simple single-column query with unique index
+        return db.session.query(Task).filter(Task.dedup_key == 
dedup_key).one_or_none()
+
+    @classmethod
+    def create_task(
+        cls,
+        task_type: str,
+        task_key: str,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+        payload: dict[str, Any] | None = None,
+        properties: TaskProperties | None = None,
+        **kwargs: Any,
+    ) -> Task:
+        """
+        Create a new task record in the database.
+
+        This is a pure data operation - assumes caller holds lock and has
+        already checked for existing tasks. Business logic (create vs join)
+        is handled by SubmitTaskCommand.
+
+        :param task_type: Type of task to create
+        :param task_key: Task identifier (required)
+        :param scope: Task scope (private/shared/system), defaults to private
+        :param user_id: User ID creating the task
+        :param payload: Optional user-defined context data (dict)
+        :param properties: Optional framework-managed runtime state (e.g., 
timeout)
+        :param kwargs: Additional task attributes (e.g., task_name)
+        :returns: Created Task instance
+        """
+        # Build dedup_key for active task
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=user_id,
+        )
+
+        # Handle both TaskScope enum and string values
+        scope_value = scope.value if isinstance(scope, TaskScope) else scope
+
+        # Note: properties is handled separately via update_properties()
+        # because it's a hybrid property with only a getter
+        task_data = {
+            "task_type": task_type,
+            "task_key": task_key,
+            "scope": scope_value,
+            "status": TaskStatus.PENDING.value,
+            "dedup_key": dedup_key,
+            **kwargs,
+        }
+
+        # Handle payload - serialize to JSON if dict provided
+        if payload:
+            from superset.utils import json
+
+            task_data["payload"] = json.dumps(payload)
+
+        if user_id is not None:
+            task_data["user_id"] = user_id
+
+        task = cls.create(attributes=task_data)
+
+        # Set properties after creation (hybrid property with getter only)
+        if properties:
+            task.update_properties(properties)
+
+        # Flush to get the task ID (auto-incremented primary key)
+        db.session.flush()
+
+        # Auto-subscribe creator for all tasks
+        # This enables consistent subscriber display across all task types
+        if user_id:
+            cls.add_subscriber(task.id, user_id)
+            logger.info(
+                "Creator %s auto-subscribed to task: %s (scope: %s)",
+                user_id,
+                task_key,
+                scope_value,
+            )
+
+        logger.info(
+            "Created new async task: %s (type: %s, scope: %s)",
+            task_key,
+            task_type,
+            scope_value,
+        )
+        return task
+
+    @classmethod
+    def abort_task(cls, task_uuid: str, skip_base_filter: bool = False) -> 
Task | None:
+        """
+        Abort a task by UUID.
+
+        This is a pure data operation. Business logic (subscriber count checks,
+        permission validation) is handled by CancelTaskCommand which holds the 
lock.
+
+        Abort behavior by status:
+        - PENDING: Goes directly to ABORTED (always abortable)
+        - IN_PROGRESS with is_abortable=True: Goes to ABORTING
+        - IN_PROGRESS with is_abortable=False/None: Raises 
TaskNotAbortableError
+        - ABORTING: Returns task (idempotent)
+        - Finished statuses: Returns None
+
+        Note: Caller is responsible for calling TaskManager.publish_abort() 
AFTER
+        the transaction commits if task.status == ABORTING. This prevents race
+        conditions where listeners check the DB before the status is visible.
+
+        :param task_uuid: UUID of task to abort
+        :param skip_base_filter: If True, skip base filter (for admin 
abortions)
+        :returns: Task if aborted/aborting, None if not found or already 
finished
+        :raises TaskNotAbortableError: If in-progress task has no abort handler
+        """
+        from superset.commands.tasks.exceptions import TaskNotAbortableError
+
+        task = cls.find_one_or_none(skip_base_filter=skip_base_filter, 
uuid=task_uuid)
+        if not task:
+            return None
+
+        # Already aborting - idempotent success
+        if task.status == TaskStatus.ABORTING.value:
+            logger.info("Task %s is already aborting", task_uuid)
+            return task
+
+        # Already finished - cannot abort
+        if task.status not in ABORTABLE_STATES:
+            return None
+
+        # PENDING: Go directly to ABORTED
+        if task.status == TaskStatus.PENDING.value:
+            task.set_status(TaskStatus.ABORTED)
+            logger.info("Aborted pending task: %s (scope: %s)", task_uuid, 
task.scope)
+            return task
+
+        # IN_PROGRESS: Check if abortable
+        if task.status == TaskStatus.IN_PROGRESS.value:
+            if task.properties.get("is_abortable") is not True:
+                raise TaskNotAbortableError(
+                    f"Task {task_uuid} is in progress but has not registered "
+                    "an abort handler (is_abortable is not true)"
+                )
+
+            # Transition to ABORTING (not ABORTED yet)
+            task.status = TaskStatus.ABORTING.value
+            db.session.merge(task)
+            logger.info("Set task %s to ABORTING (scope: %s)", task_uuid, 
task.scope)
+
+            # NOTE: publish_abort is NOT called here - caller handles it after 
commit
+            # This prevents race conditions where listeners check DB before 
commit
+
+            return task
+
+        return None
+
+    # Subscription management methods
+
+    @classmethod
+    def add_subscriber(cls, task_id: int, user_id: int) -> bool:
+        """
+        Add a user as a subscriber to a task.
+
+        :param task_id: ID of the task
+        :param user_id: ID of the user to subscribe
+        :returns: True if subscriber was added, False if already exists
+        """
+        # Check first to avoid IntegrityError which invalidates the session
+        # in nested transaction contexts (IntegrityError can't be recovered 
from)
+        existing = (
+            db.session.query(TaskSubscriber)
+            .filter_by(task_id=task_id, user_id=user_id)
+            .first()
+        )
+        if existing:
+            logger.debug(
+                "Subscriber %s already subscribed to task %s", user_id, task_id
+            )
+            return False
+
+        subscription = TaskSubscriber(
+            task_id=task_id,
+            user_id=user_id,
+            subscribed_at=datetime.now(timezone.utc),
+        )
+        db.session.add(subscription)
+        db.session.flush()

Review Comment:
   **Suggestion:** The subscriber addition logic checks for existence before 
inserting, but under concurrent calls a unique constraint violation can still 
occur on flush, raising an IntegrityError that leaves the session in a failed 
state and breaks the transaction. The fix is to catch IntegrityError around 
flush (after the add) and treat it as an "already subscribed" case while 
rolling back the failed operation to keep the session usable. [race condition]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ❌ Concurrent subscription attempts can cause session failure.
   - ⚠️ Task creation auto-subscribe may trigger IntegrityError.
   - ⚠️ Unhandled DB error surfaces as 500 responses.
   ```
   </details>
   
   ```suggestion
           from sqlalchemy.exc import IntegrityError
   
           # Check first to avoid IntegrityError which invalidates the session
           # in nested transaction contexts (IntegrityError can't be recovered 
from)
           existing = (
               db.session.query(TaskSubscriber)
               .filter_by(task_id=task_id, user_id=user_id)
               .first()
           )
           if existing:
               logger.debug(
                   "Subscriber %s already subscribed to task %s", user_id, 
task_id
               )
               return False
   
           subscription = TaskSubscriber(
               task_id=task_id,
               user_id=user_id,
               subscribed_at=datetime.now(timezone.utc),
           )
           db.session.add(subscription)
           try:
               db.session.flush()
           except IntegrityError:
               db.session.rollback()
               logger.debug(
                   "Subscriber %s already subscribed to task %s 
(IntegrityError)",
                   user_id,
                   task_id,
               )
               return False
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Inspect add_subscriber implementation at superset/daos/tasks.py:236-265 — 
it checks
   existence then adds and flushes without handling IntegrityError.
   
   2. Trigger two concurrent subscribe attempts for the same (task_id, user_id) 
pair (for
   example: simultaneous HTTP requests or concurrent workers calling 
TaskDAO.add_subscriber
   during task creation/auto-subscription at superset/daos/tasks.py:140 
create_task ->
   add_subscriber).
   
   3. Both processes call the existence check; both see no existing row, both 
call
   db.session.add(subscription). The first flush succeeds; the second flush 
raises
   sqlalchemy.exc.IntegrityError due to unique constraint.
   
   4. Observe the second request leaves db.session in a failed state (unhandled
   IntegrityError), causing transaction/session failure and likely a 500 
response; this is
   reproducible from the function location and realistic under concurrent usage.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/daos/tasks.py
   **Line:** 245:263
   **Comment:**
        *Race Condition: The subscriber addition logic checks for existence 
before inserting, but under concurrent calls a unique constraint violation can 
still occur on flush, raising an IntegrityError that leaves the session in a 
failed state and breaks the transaction. The fix is to catch IntegrityError 
around flush (after the add) and treat it as an "already subscribed" case while 
rolling back the failed operation to keep the session usable.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to