codeant-ai-for-open-source[bot] commented on code in PR #36368:
URL: https://github.com/apache/superset/pull/36368#discussion_r2748720072
##########
superset/tasks/utils.py:
##########
@@ -142,3 +151,166 @@ def fetch_csrf_token(
logger.error("Error fetching CSRF token, status code: %s", response.status)
return {}
+
+
+def generate_random_task_key() -> str:
+ """
+ Generate a random task key.
+
+ This is the default behavior - each task submission gets a unique UUID
+ unless an explicit task_key is provided in TaskOptions.
+
+ :returns: A random UUID string
+ """
+ return str(uuid4())
+
+
+def get_active_dedup_key(
+ scope: TaskScope | str,
+ task_type: str,
+ task_key: str,
+ user_id: int | None = None,
+) -> str:
+ """
+ Build a deduplication key for active tasks.
+
+ The dedup_key enforces uniqueness at the database level via a unique index.
+ Active tasks use a composite key based on scope, which is then hashed using
+ the configured HASH_ALGORITHM to produce a fixed-length key.
+
+ The composite key format before hashing is:
+ - Private: private|task_type|task_key|user_id
+ - Shared: shared|task_type|task_key
+ - System: system|task_type|task_key
+
+ The final key is a hash digest (64 chars for sha256, 32 chars for md5).
+
+ :param scope: Task scope (PRIVATE/SHARED/SYSTEM) as TaskScope enum or
string
+ :param task_type: Type of task (e.g., 'sql_execution')
+ :param task_key: Task identifier for deduplication
+ :param user_id: User ID for private tasks (falls back to g.user if not
provided)
+ :returns: Hashed deduplication key string
+ :raises ValueError: If user_id is missing for private scope
+ """
+ # Convert string to TaskScope if needed
+ if isinstance(scope, str):
+ scope = TaskScope(scope)
+
+ # Build composite key
+ match scope:
+ case TaskScope.PRIVATE:
+ # Use provided user_id, or fall back to current user from Flask
context
+ effective_user_id = user_id
+ if effective_user_id is None:
+ from superset.utils.core import get_user_id
+
+ effective_user_id = get_user_id()
+ if effective_user_id is None:
+ raise ValueError("user_id required for private tasks")
+ composite_key =
f"{scope.value}|{task_type}|{task_key}|{effective_user_id}"
+ case TaskScope.SHARED:
+ composite_key = f"{scope.value}|{task_type}|{task_key}"
+ case TaskScope.SYSTEM:
+ composite_key = f"{scope.value}|{task_type}|{task_key}"
+ case _:
+ raise ValueError(f"Invalid scope: {scope}")
+
+ # Hash the composite key to produce a fixed-length dedup_key
+ # Truncate to 64 chars max to fit the database column in case
+ # a hash algo is used that generates hashes that exceed 64 chars
+ return hash_from_str(composite_key)[:64]
+
+
+def get_finished_dedup_key(task_uuid: str) -> str:
+ """
+ Build a deduplication key for finished tasks.
+
+ When a task completes (success, failure, or abort), its dedup_key is
+ changed to its UUID. This frees up the slot so new tasks with the same
+ parameters can be created.
+
+ :param task_uuid: Task UUID
+ :returns: The task UUID as the dedup key
+
+ Example:
+ >>> get_finished_dedup_key("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
+ 'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
+ """
+ return task_uuid
+
+
+# -----------------------------------------------------------------------------
+# TaskProperties helper functions
+# -----------------------------------------------------------------------------
+
+
+def progress_update(progress: float | int | tuple[int, int]) -> TaskProperties:
+ """
+ Create a properties update dict for progress values.
+
+ :param progress: One of:
+ - float (0.0-1.0): Percentage only
+ - int: Count only (total unknown)
+ - tuple[int, int]: (current, total) with auto-computed percentage
+ :returns: TaskProperties dict with appropriate progress fields set
+
+ Example:
+ task.update_properties(progress_update((50, 100)))
+ """
+ if isinstance(progress, float):
+ return {"progress_percent": progress}
+ if isinstance(progress, int):
+ return {"progress_current": progress}
+ # tuple
+ current, total = progress
+ result: TaskProperties = {
+ "progress_current": current,
+ "progress_total": total,
+ }
+ if total > 0:
+ result["progress_percent"] = current / total
+ return result
+
+
+def error_update(exception: BaseException) -> TaskProperties:
+ """
+ Create a properties update dict from an exception.
+
+ :param exception: The exception that caused the failure
+ :returns: TaskProperties dict with error fields populated
+ """
+ return {
+ "error_message": str(exception),
+ "exception_type": type(exception).__name__,
+ "stack_trace": traceback.format_exc(),
Review Comment:
**Suggestion:** The stack trace in the error properties is taken from
`traceback.format_exc()` instead of the passed-in exception, so if
`error_update` is called outside an `except` block (or after a different
exception was handled) it will record an empty or incorrect traceback unrelated
to the actual exception object. [logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ❌ Task error reports store incorrect stack traces.
- ⚠️ Task debugging information becomes misleading.
- ⚠️ Maintainers waste time investigating wrong traces.
```
</details>
```suggestion
"stack_trace": "".join(
traceback.format_exception(
type(exception), exception, exception.__traceback__
)
),
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. In a Python REPL or test, import the helper at
`superset/tasks/utils.py:275`:
from superset.tasks.utils import error_update
2. Create an exception object and call the helper outside an except block:
ex = ValueError("test")
props = error_update(ex)
(This calls `superset/tasks/utils.py:275-286`.)
3. Observe `props["stack_trace"]` contains the most recent handled traceback
(or an
empty/irrelevant trace)
because `traceback.format_exc()` uses the current exception context, not
the passed
`ex`.
4. Alternatively, reproduce by calling `error_update()` after handling a
different
exception:
try:
raise KeyError("other")
except KeyError:
pass
props = error_update(ValueError("test"))
The recorded stack trace corresponds to the earlier KeyError (or is
empty), not the
ValueError passed in.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/tasks/utils.py
**Line:** 285:285
**Comment:**
*Logic Error: The stack trace in the error properties is taken from
`traceback.format_exc()` instead of the passed-in exception, so if
`error_update` is called outside an `except` block (or after a different
exception was handled) it will record an empty or incorrect traceback unrelated
to the actual exception object.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
tests/unit_tests/tasks/test_utils.py:
##########
@@ -330,3 +340,248 @@ def test_get_executor(
)
assert executor_type == expected_executor_type
assert executor == expected_executor
+
+
[email protected](
+ "scope,task_type,task_key,user_id,expected_composite_key",
+ [
+ # Private tasks with TaskScope enum
+ (
+ TaskScope.PRIVATE,
+ "sql_execution",
+ "chart_123",
+ 42,
+ "private|sql_execution|chart_123|42",
+ ),
+ (
+ TaskScope.PRIVATE,
+ "thumbnail_gen",
+ "dash_456",
+ 100,
+ "private|thumbnail_gen|dash_456|100",
+ ),
+ # Private tasks with string scope
+ (
+ "private",
+ "api_call",
+ "endpoint_789",
+ 200,
+ "private|api_call|endpoint_789|200",
+ ),
+ # Shared tasks with TaskScope enum
+ (
+ TaskScope.SHARED,
+ "report_gen",
+ "monthly_report",
+ None,
+ "shared|report_gen|monthly_report",
+ ),
+ (
+ TaskScope.SHARED,
+ "export_csv",
+ "large_export",
+ 999, # user_id should be ignored for shared
+ "shared|export_csv|large_export",
+ ),
+ # Shared tasks with string scope
+ (
+ "shared",
+ "batch_process",
+ "batch_001",
+ 123, # user_id should be ignored for shared
+ "shared|batch_process|batch_001",
+ ),
+ # System tasks with TaskScope enum
+ (
+ TaskScope.SYSTEM,
+ "cleanup_task",
+ "daily_cleanup",
+ None,
+ "system|cleanup_task|daily_cleanup",
+ ),
+ (
+ TaskScope.SYSTEM,
+ "db_migration",
+ "version_123",
+ 1, # user_id should be ignored for system
+ "system|db_migration|version_123",
+ ),
+ # System tasks with string scope
+ (
+ "system",
+ "maintenance",
+ "nightly_job",
+ 2, # user_id should be ignored for system
+ "system|maintenance|nightly_job",
+ ),
+ ],
+)
+def test_get_active_dedup_key(
+ scope, task_type, task_key, user_id, expected_composite_key, mocker,
app_context
+):
+ """Test get_active_dedup_key generates a hash of the composite key.
+
+ The function hashes the composite key using the configured HASH_ALGORITHM
+ to produce a fixed-length dedup_key for database storage. The result is
+ truncated to 64 chars to fit the database column.
+ """
+ # Mock get_user_id to return the specified user_id
+ mocker.patch("superset.utils.core.get_user_id", return_value=user_id)
+
+ result = get_active_dedup_key(scope, task_type, task_key)
+
+ # The result should be a hash of the expected composite key, truncated to
64 chars
+ expected_hash = hash_from_str(expected_composite_key)[:64]
+ assert result == expected_hash
+ assert len(result) <= 64
+
+
+def test_get_active_dedup_key_private_requires_user_id(mocker):
+ """Test that private tasks require user_id from get_user_id()"""
+ # Mock get_user_id to return None
+ mocker.patch("superset.utils.core.get_user_id", return_value=None)
+
Review Comment:
**Suggestion:** Similarly, this test for the error path of private tasks
patches `get_user_id` on `superset.utils.core`, which will not intercept calls
if `superset.tasks.utils.get_active_dedup_key` imported `get_user_id` directly;
in that case `get_active_dedup_key` will see a real user ID and not raise the
expected `ValueError`, causing the test to give false negatives. [possible bug]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ❌ This test may silently pass or fail incorrectly in CI.
- ⚠️ Affects dedup key validation negative-path test.
```
</details>
```suggestion
mocker.patch("superset.tasks.utils.get_user_id", return_value=None)
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Open tests/unit_tests/tasks/test_utils.py at the test function
test_get_active_dedup_key_private_requires_user_id (starts at line ~440).
The mock is set
at lines 442-443.
2. Check superset/tasks/utils.py to see whether get_active_dedup_key calls
get_user_id via
a local import (e.g., `from superset.utils.core import get_user_id`) or by
referencing
superset.utils.core.get_user_id at call-time.
3. Run pytest for the failing test: pytest
tests/unit_tests/tasks/test_utils.py::test_get_active_dedup_key_private_requires_user_id.
The test patches superset.utils.core.get_user_id but not
superset.tasks.utils.get_user_id.
4. If tasks.utils bound get_user_id at import time, the test will not
observe the patched
None and get_active_dedup_key will not raise ValueError at line where it
validates private
tasks — reproducing the false-negative test behavior. If tasks.utils calls
the core
function dynamically, the current patch succeeds.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** tests/unit_tests/tasks/test_utils.py
**Line:** 443:443
**Comment:**
*Possible Bug: Similarly, this test for the error path of private tasks
patches `get_user_id` on `superset.utils.core`, which will not intercept calls
if `superset.tasks.utils.get_active_dedup_key` imported `get_user_id` directly;
in that case `get_active_dedup_key` will see a real user ID and not raise the
expected `ValueError`, causing the test to give false negatives.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
superset/models/tasks.py:
##########
@@ -0,0 +1,370 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task model for Global Task Framework (GTF)"""
+
+from __future__ import annotations
+
+import uuid
+from datetime import datetime, timezone
+from typing import Any, cast
+
+from flask_appbuilder import Model
+from sqlalchemy import (
+ Column,
+ DateTime,
+ Integer,
+ String,
+ Text,
+)
+from sqlalchemy.orm import relationship
+from superset_core.api.models import Task as CoreTask
+from superset_core.api.tasks import TaskProperties, TaskStatus
+
+from superset.models.helpers import AuditMixinNullable
+from superset.models.task_subscribers import TaskSubscriber
+from superset.tasks.utils import (
+ error_update,
+ get_finished_dedup_key,
+ parse_properties,
+ serialize_properties,
+)
+from superset.utils import json
+
+
+class Task(CoreTask, AuditMixinNullable, Model):
+ """
+ Concrete Task model for the Global Task Framework (GTF).
+
+ This model represents async tasks in Superset, providing unified tracking
+ for all background operations including SQL queries, thumbnail generation,
+ reports, and other async operations.
+
+ Non-filterable fields (progress, error info, execution config) are stored
+ in a `properties` JSON blob for schema flexibility.
+ """
+
+ __tablename__ = "tasks"
+
+ # Primary key and identifiers
+ id = Column(Integer, primary_key=True)
+ uuid = Column(
+ String(36), nullable=False, unique=True, default=lambda:
str(uuid.uuid4())
+ )
+
+ # Task metadata (filterable)
+ task_key = Column(String(256), nullable=False, index=True) # For
deduplication
+ task_type = Column(String(100), nullable=False, index=True) # e.g.,
'sql_execution'
+ task_name = Column(String(256), nullable=True) # Human readable name
+ scope = Column(
+ String(20), nullable=False, index=True, default="private"
+ ) # private/shared/system
+ status = Column(
+ String(50), nullable=False, index=True,
default=TaskStatus.PENDING.value
+ )
+ dedup_key = Column(
+ String(64), nullable=False, unique=True, index=True
+ ) # Hashed deduplication key (SHA-256 = 64 chars, UUID = 36 chars)
+
+ # Timestamps
+ started_at = Column(DateTime, nullable=True)
+ ended_at = Column(DateTime, nullable=True)
+
+ # User context for execution
+ user_id = Column(Integer, nullable=True)
+
+ # Task-specific output data (set by task code via
ctx.update_task(payload=...))
+ payload = Column(Text, nullable=True, default="{}")
+
+ # Properties JSON blob - contains runtime state and execution config:
+ # - is_abortable: bool - has abort handler registered
+ # - progress_percent: float - progress 0.0-1.0
+ # - progress_current: int - current iteration count
+ # - progress_total: int - total iterations
+ # - error_message: str - human-readable error message
+ # - exception_type: str - exception class name
+ # - stack_trace: str - full formatted traceback
+ # - timeout: int - timeout in seconds
+ _properties = Column("properties", Text, nullable=True, default="{}")
+
+ # Transient cache for parsed properties (not persisted)
+ _properties_cache: TaskProperties | None = None
+
+ # Relationships
+ subscribers = relationship(
+ TaskSubscriber,
+ back_populates="task",
+ cascade="all, delete-orphan",
+ )
+
+ def __repr__(self) -> str:
+ return f"<Task {self.task_type}:{self.task_key} [{self.status}]>"
+
+ # -------------------------------------------------------------------------
+ # Properties accessor
+ # -------------------------------------------------------------------------
+
+ @property
+ def properties(self) -> TaskProperties:
+ """
+ Get typed properties (cached for performance).
+
+ Properties contain runtime state and execution config that doesn't
+ need database filtering. Parsed once and cached until next write.
+
+ Always use .get() for reads since keys may be absent.
+
+ :returns: TaskProperties dict (sparse - only contains keys that were
set)
+ """
+ if self._properties_cache is None:
+ self._properties_cache = parse_properties(self._properties)
+ return self._properties_cache
+
+ def update_properties(self, updates: TaskProperties) -> None:
+ """
+ Update specific properties fields (merge semantics).
+
+ Only updates fields present in the updates dict.
+
+ :param updates: TaskProperties dict with fields to update
+
+ Example:
+ task.update_properties({"is_abortable": True})
+ task.update_properties(progress_update((50, 100)))
+ """
+ current = cast(TaskProperties, dict(self.properties))
+ current.update(updates) # Merge updates
+ self._properties = serialize_properties(current)
+ self._properties_cache = current # Update cache
+
+ # -------------------------------------------------------------------------
+ # Payload accessor (for task-specific output data)
+ # -------------------------------------------------------------------------
+
+ def get_payload(self) -> dict[str, Any]:
+ """
+ Get payload as parsed JSON.
+
+ Payload contains task-specific output data set by task code via
+ ctx.update_task(payload=...).
+
+ :returns: Dictionary containing payload data
+ """
+ try:
+ return json.loads(self.payload or "{}")
+ except (json.JSONDecodeError, TypeError):
+ return {}
+
+ def set_payload(self, data: dict[str, Any]) -> None:
+ """
+ Update payload with new data.
+
+ The payload is merged with existing data, not replaced.
+
+ :param data: Dictionary of data to merge into payload
+ """
+ current = self.get_payload()
+ current.update(data)
+ self.payload = json.dumps(current)
+
+ # -------------------------------------------------------------------------
+ # Error handling
+ # -------------------------------------------------------------------------
+
+ def set_error_from_exception(self, exception: BaseException) -> None:
+ """
+ Set error fields from an exception.
+
+ Captures the error message, exception type, and full stack trace.
+ Called automatically by the executor when a task raises an exception.
+
+ :param exception: The exception that caused the failure
+ """
+ self.update_properties(error_update(exception))
+
+ # -------------------------------------------------------------------------
+ # Status management
+ # -------------------------------------------------------------------------
+
+ def set_status(self, status: TaskStatus | str) -> None:
+ """
+ Update task status and dedup_key.
+
+ When a task finishes (success, failure, or abort), the dedup_key is
+ changed to the task's UUID. This frees up the slot so new tasks with
+ the same parameters can be created.
+
+ :param status: New task status
+ """
+ if isinstance(status, TaskStatus):
+ status = status.value
+ self.status = status
+
+ # Update timestamps and is_abortable based on status
+ now = datetime.now(timezone.utc)
+ if status == TaskStatus.IN_PROGRESS.value and not self.started_at:
+ self.started_at = now
+ # Set is_abortable to False when task starts executing
+ # (will be set to True if/when an abort handler is registered)
+ if self.properties.get("is_abortable") is None:
+ self.update_properties({"is_abortable": False})
+ elif status in [
+ TaskStatus.SUCCESS.value,
+ TaskStatus.FAILURE.value,
+ TaskStatus.ABORTED.value,
+ TaskStatus.TIMED_OUT.value,
+ ]:
+ if not self.ended_at:
+ self.ended_at = now
+ # Update dedup_key to UUID to free up the slot for new tasks
+ self.dedup_key = get_finished_dedup_key(self.uuid)
+ # Note: ABORTING status doesn't set ended_at yet - that happens when
+ # the task transitions to ABORTED after handlers complete
+
+ @property
+ def is_pending(self) -> bool:
+ """Check if task is pending."""
+ return self.status == TaskStatus.PENDING.value
+
+ @property
+ def is_running(self) -> bool:
+ """Check if task is currently running."""
+ return self.status == TaskStatus.IN_PROGRESS.value
+
+ @property
+ def is_finished(self) -> bool:
+ """Check if task has finished (success, failure, aborted, or timed
out)."""
+ return self.status in [
+ TaskStatus.SUCCESS.value,
+ TaskStatus.FAILURE.value,
+ TaskStatus.ABORTED.value,
+ TaskStatus.TIMED_OUT.value,
+ ]
+
+ @property
+ def is_successful(self) -> bool:
+ """Check if task completed successfully."""
+ return self.status == TaskStatus.SUCCESS.value
+
+ @property
+ def duration_seconds(self) -> float | None:
+ """
+ Get task duration in seconds.
+
+ - Finished tasks: Time from started_at to ended_at (None if never
started)
+ - Running/aborting tasks: Time from started_at to now
+ - Pending tasks: Time from created_on to now (queue time)
+
+ Note: started_at/ended_at are stored in UTC, but created_on from
+ AuditMixinNullable is stored as naive local time. We handle both cases.
+ """
+ if self.is_finished:
+ # Task has completed - use fixed timestamps, never increment
+ if self.started_at and self.ended_at:
+ # Finished task - both timestamps use the same timezone (UTC)
+ # Just compute the difference directly
+ return (self.ended_at - self.started_at).total_seconds()
+ # Never started (e.g., aborted while pending) - no duration
+ return None
+ elif self.started_at:
+ # Running or aborting - started_at is UTC (set by set_status)
+ # Use UTC now for comparison
+ now = datetime.now(timezone.utc)
+ started = (
+ self.started_at.replace(tzinfo=timezone.utc)
+ if self.started_at.tzinfo is None
+ else self.started_at
+ )
+ return (now - started).total_seconds()
+ elif self.created_on:
+ # Pending - created_on is naive LOCAL time (from
AuditMixinNullable)
+ # Use naive local time for comparison
+ now = datetime.now() # Local time, no timezone
+ created = (
+ self.created_on.replace(tzinfo=None)
+ if self.created_on.tzinfo is not None
+ else self.created_on
+ )
+ return (now - created).total_seconds()
+ return None
+
+ # Scope-related properties
+ @property
+ def is_private(self) -> bool:
+ """Check if task is private (user-specific)."""
+ return self.scope == "private"
+
+ @property
+ def is_shared(self) -> bool:
+ """Check if task is shared (multi-user)."""
+ return self.scope == "shared"
+
+ @property
+ def is_system(self) -> bool:
+ """Check if task is system (admin-only)."""
+ return self.scope == "system"
+
+ # Subscriber-related methods
+ @property
+ def subscriber_count(self) -> int:
+ """Get number of subscribers to this task."""
+ return len(self.subscribers)
+
+ def has_subscriber(self, user_id: int) -> bool:
+ """
+ Check if a user is subscribed to this task.
+
+ :param user_id: User ID to check
+ :returns: True if user is subscribed
+ """
+ return any(sub.user_id == user_id for sub in self.subscribers)
+
+ def get_subscriber_ids(self) -> list[int]:
+ """
+ Get list of all subscriber user IDs.
+
+ :returns: List of user IDs subscribed to this task
+ """
+ return [sub.user_id for sub in self.subscribers]
+
+ def to_dict(self) -> dict[str, Any]:
+ """
+ Convert task to dictionary representation.
+
+ Minimal API payload - frontend derives status booleans and abort logic
+ from status and properties.is_abortable.
+
+ :returns: Dictionary representation of the task
+ """
+ return {
+ "id": self.id,
+ "uuid": self.uuid,
+ "task_key": self.task_key,
+ "task_type": self.task_type,
+ "task_name": self.task_name,
+ "scope": self.scope,
+ "status": self.status,
+ "created_on": self.created_on.isoformat() if self.created_on else
None,
+ "changed_on": self.changed_on.isoformat() if self.changed_on else
None,
+ "started_at": self.started_at.isoformat() if self.started_at else
None,
+ "ended_at": self.ended_at.isoformat() if self.ended_at else None,
+ "created_by_fk": self.created_by_fk,
+ "user_id": self.user_id,
+ "payload": self.get_payload(),
+ "properties": self.properties, # Already a dict
Review Comment:
**Suggestion:** `set_error_from_exception` stores a full `stack_trace` in
`properties`, and `to_dict` returns `self.properties` verbatim, which means raw
stack traces (including file paths and possibly sensitive information) are
exposed to API consumers, representing an information disclosure vulnerability;
the API should strip stack traces from the serialized payload. [security]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ API payload exposes internal stack traces.
- ⚠️ Error reporting leaks file paths and internals.
- ❌ Potential information disclosure to clients.
```
</details>
```suggestion
"properties": {
key: value
for key, value in self.properties.items()
if key != "stack_trace"
},
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Cause a task to raise an exception; the executor (per the docstring in
superset/models/tasks.py) will call set_error_from_exception(exception)
(superset/models/tasks.py:186-197).
2. set_error_from_exception calls update_properties(...)
(superset/models/tasks.py:196),
which uses error_update(exception) from superset/tasks/utils.py:275-286.
error_update
populates "stack_trace" via traceback.format_exc().
3. After the error, call task.to_dict() (superset/models/tasks.py:343-370).
The
implementation returns "properties": self.properties verbatim
(superset/models/tasks.py:366-368), which includes the "stack_trace" key and
full
traceback.
4. Observation: the returned dict (intended as a minimal API payload per the
to_dict()
docstring) contains raw stack traces and formatted tracebacks, exposing
internal file
paths and implementation details to API consumers/front-end.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/models/tasks.py
**Line:** 367:367
**Comment:**
*Security: `set_error_from_exception` stores a full `stack_trace` in
`properties`, and `to_dict` returns `self.properties` verbatim, which means raw
stack traces (including file paths and possibly sensitive information) are
exposed to API consumers, representing an information disclosure vulnerability;
the API should strip stack traces from the serialized payload.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
superset/tasks/utils.py:
##########
@@ -142,3 +151,166 @@ def fetch_csrf_token(
logger.error("Error fetching CSRF token, status code: %s", response.status)
return {}
+
+
+def generate_random_task_key() -> str:
+ """
+ Generate a random task key.
+
+ This is the default behavior - each task submission gets a unique UUID
+ unless an explicit task_key is provided in TaskOptions.
+
+ :returns: A random UUID string
+ """
+ return str(uuid4())
+
+
+def get_active_dedup_key(
+ scope: TaskScope | str,
+ task_type: str,
+ task_key: str,
+ user_id: int | None = None,
+) -> str:
+ """
+ Build a deduplication key for active tasks.
+
+ The dedup_key enforces uniqueness at the database level via a unique index.
+ Active tasks use a composite key based on scope, which is then hashed using
+ the configured HASH_ALGORITHM to produce a fixed-length key.
+
+ The composite key format before hashing is:
+ - Private: private|task_type|task_key|user_id
+ - Shared: shared|task_type|task_key
+ - System: system|task_type|task_key
+
+ The final key is a hash digest (64 chars for sha256, 32 chars for md5).
+
+ :param scope: Task scope (PRIVATE/SHARED/SYSTEM) as TaskScope enum or
string
+ :param task_type: Type of task (e.g., 'sql_execution')
+ :param task_key: Task identifier for deduplication
+ :param user_id: User ID for private tasks (falls back to g.user if not
provided)
+ :returns: Hashed deduplication key string
+ :raises ValueError: If user_id is missing for private scope
+ """
+ # Convert string to TaskScope if needed
+ if isinstance(scope, str):
+ scope = TaskScope(scope)
+
+ # Build composite key
+ match scope:
+ case TaskScope.PRIVATE:
+ # Use provided user_id, or fall back to current user from Flask
context
+ effective_user_id = user_id
+ if effective_user_id is None:
+ from superset.utils.core import get_user_id
+
+ effective_user_id = get_user_id()
+ if effective_user_id is None:
+ raise ValueError("user_id required for private tasks")
+ composite_key =
f"{scope.value}|{task_type}|{task_key}|{effective_user_id}"
+ case TaskScope.SHARED:
+ composite_key = f"{scope.value}|{task_type}|{task_key}"
+ case TaskScope.SYSTEM:
+ composite_key = f"{scope.value}|{task_type}|{task_key}"
+ case _:
+ raise ValueError(f"Invalid scope: {scope}")
+
+ # Hash the composite key to produce a fixed-length dedup_key
+ # Truncate to 64 chars max to fit the database column in case
+ # a hash algo is used that generates hashes that exceed 64 chars
+ return hash_from_str(composite_key)[:64]
+
+
+def get_finished_dedup_key(task_uuid: str) -> str:
+ """
+ Build a deduplication key for finished tasks.
+
+ When a task completes (success, failure, or abort), its dedup_key is
+ changed to its UUID. This frees up the slot so new tasks with the same
+ parameters can be created.
+
+ :param task_uuid: Task UUID
+ :returns: The task UUID as the dedup key
+
+ Example:
+ >>> get_finished_dedup_key("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
+ 'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
+ """
+ return task_uuid
+
+
+# -----------------------------------------------------------------------------
+# TaskProperties helper functions
+# -----------------------------------------------------------------------------
+
+
+def progress_update(progress: float | int | tuple[int, int]) -> TaskProperties:
+ """
+ Create a properties update dict for progress values.
+
+ :param progress: One of:
+ - float (0.0-1.0): Percentage only
+ - int: Count only (total unknown)
+ - tuple[int, int]: (current, total) with auto-computed percentage
+ :returns: TaskProperties dict with appropriate progress fields set
+
+ Example:
+ task.update_properties(progress_update((50, 100)))
+ """
+ if isinstance(progress, float):
+ return {"progress_percent": progress}
+ if isinstance(progress, int):
+ return {"progress_current": progress}
+ # tuple
+ current, total = progress
+ result: TaskProperties = {
+ "progress_current": current,
+ "progress_total": total,
+ }
+ if total > 0:
+ result["progress_percent"] = current / total
+ return result
+
+
+def error_update(exception: BaseException) -> TaskProperties:
+ """
+ Create a properties update dict from an exception.
+
+ :param exception: The exception that caused the failure
+ :returns: TaskProperties dict with error fields populated
+ """
+ return {
+ "error_message": str(exception),
+ "exception_type": type(exception).__name__,
+ "stack_trace": traceback.format_exc(),
+ }
+
+
+def parse_properties(json_str: str | None) -> TaskProperties:
+ """
+ Parse JSON string into TaskProperties dict.
+
+ Returns empty dict on parse errors. Unknown keys are preserved
+ for forward compatibility (allows adding new properties without
+ breaking existing code).
+
+ :param json_str: JSON string or None
+ :returns: TaskProperties dict (sparse - only contains keys that were set)
+ """
+ if not json_str:
+ return {}
+
+ try:
+ return json.loads(json_str)
+ except (json.JSONDecodeError, TypeError):
+ return {}
Review Comment:
**Suggestion:** The JSON parsing helper claims to return a `TaskProperties`
dict but directly returns the result of `json.loads`, which can be `None` or a
list (for inputs like `"null"` or `"[]"`), leading to type mismatches and
potential runtime errors when callers assume a dict. [type error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ❌ Task properties parsing yields non-dict breaking callers.
- ⚠️ Task UI code expecting dict may raise TypeError.
- ⚠️ Data-reading code paths may mis-handle stored JSON.
```
</details>
```suggestion
value = json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return {}
return value if isinstance(value, dict) else {}
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. In a Python REPL or unit test, import the helper at
`superset/tasks/utils.py:289`:
from superset.tasks.utils import parse_properties
2. Call the function with JSON that is valid but not an object, e.g. the
JSON literal
`null`:
result = parse_properties("null")
(This executes `superset/tasks/utils.py:289-306`.)
3. Observe `result` is None (or a non-dict) because `json.loads("null")`
returns None,
violating the expected TaskProperties dict contract.
4. Similarly, call with a JSON array: result = parse_properties("[]") and
observe a list
returned, which will break callers that assume dict-like access (e.g.,
calling .get()).
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/tasks/utils.py
**Line:** 304:306
**Comment:**
*Type Error: The JSON parsing helper claims to return a
`TaskProperties` dict but directly returns the result of `json.loads`, which
can be `None` or a list (for inputs like `"null"` or `"[]"`), leading to type
mismatches and potential runtime errors when callers assume a dict.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
superset/migrations/versions/2025_12_18_0220_create_tasks_table.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Create tasks and task_subscriber tables for Global Task Framework (GTF)
+
+Revision ID: 4b2a8c9d3e1f
+Revises: 9787190b3d89
+Create Date: 2025-12-18 02:20:00.000000
+
+"""
+
+from sqlalchemy import (
+ Column,
+ DateTime,
+ Integer,
+ String,
+ Text,
+ UniqueConstraint,
+)
+
+from superset.migrations.shared.utils import (
+ create_fks_for_table,
+ create_index,
+ create_table,
+ drop_fks_for_table,
+ drop_index,
+ drop_table,
+)
+
+# revision identifiers, used by Alembic.
+revision = "4b2a8c9d3e1f"
+down_revision = "9787190b3d89"
+
+TASKS_TABLE = "tasks"
+TASK_SUBSCRIBERS_TABLE = "task_subscribers"
+
+
+def upgrade():
+ """
+ Create tasks and task_subscribers tables for the Global Task Framework
(GTF).
+
+ This migration creates:
+ 1. tasks table - unified tracking for all long running tasks
+ 2. task_subscribers table - multi-user task subscriptions for shared tasks
+
+ The scope feature allows tasks to be:
+ - private: user-specific (default)
+ - shared: multi-user collaborative tasks
+ - system: admin-only background tasks
+ """
+ # Create tasks table
+ create_table(
+ TASKS_TABLE,
+ Column("id", Integer, primary_key=True),
+ Column("uuid", String(36), nullable=False, unique=True),
+ Column("task_key", String(256), nullable=False),
+ Column("task_type", String(100), nullable=False),
+ Column("task_name", String(256), nullable=True),
+ Column("scope", String(20), nullable=False, server_default="private"),
+ Column("status", String(50), nullable=False),
+ Column("dedup_key", String(64), nullable=False),
+ # AuditMixinNullable columns
+ Column("created_on", DateTime, nullable=True),
+ Column("changed_on", DateTime, nullable=True),
+ Column("created_by_fk", Integer, nullable=True),
+ Column("changed_by_fk", Integer, nullable=True),
+ # Task-specific columns
+ Column("started_at", DateTime, nullable=True),
+ Column("ended_at", DateTime, nullable=True),
+ Column("user_id", Integer, nullable=True),
+ Column("payload", Text, nullable=True),
+ Column("properties", Text, nullable=True),
+ )
+
+ # Create indexes for optimal query performance
+ create_index(TASKS_TABLE, "idx_tasks_dedup_key", ["dedup_key"],
unique=True)
+ create_index(TASKS_TABLE, "idx_tasks_status", ["status"])
+ create_index(TASKS_TABLE, "idx_tasks_scope", ["scope"])
+ create_index(TASKS_TABLE, "idx_tasks_ended_at", ["ended_at"])
+ create_index(TASKS_TABLE, "idx_tasks_created_by", ["created_by_fk"])
+ create_index(TASKS_TABLE, "idx_tasks_created_on", ["created_on"])
+ create_index(TASKS_TABLE, "idx_tasks_task_type", ["task_type"])
+ create_index(TASKS_TABLE, "idx_tasks_uuid", ["uuid"], unique=True)
+
+ # Create foreign key constraints for tasks
+ create_fks_for_table(
+ foreign_key_name="fk_tasks_created_by_fk_ab_user",
+ table_name=TASKS_TABLE,
+ referenced_table="ab_user",
+ local_cols=["created_by_fk"],
+ remote_cols=["id"],
+ ondelete="SET NULL",
+ )
+
+ create_fks_for_table(
+ foreign_key_name="fk_tasks_changed_by_fk_ab_user",
+ table_name=TASKS_TABLE,
+ referenced_table="ab_user",
+ local_cols=["changed_by_fk"],
+ remote_cols=["id"],
+ ondelete="SET NULL",
+ )
+
Review Comment:
**Suggestion:** The `user_id` column on the tasks table is defined as an
integer without a foreign key constraint, unlike other user-related columns,
which can lead to orphaned task records when users are deleted or IDs change
and makes data integrity inconsistent with `task_subscribers.user_id`. [logic
error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ❌ Tasks retain deleted user IDs (orphaned references).
- ⚠️ Inconsistent integrity vs task_subscribers.user_id FK.
- ⚠️ Reporting or joins may show stale task ownership.
```
</details>
```suggestion
create_fks_for_table(
foreign_key_name="fk_tasks_user_id_ab_user",
table_name=TASKS_TABLE,
referenced_table="ab_user",
local_cols=["user_id"],
remote_cols=["id"],
ondelete="SET NULL",
)
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Apply the migration by running Alembic upgrade that executes upgrade() in
this file
(`superset/migrations/versions/2025_12_18_0220_create_tasks_table.py:51`-`86`).
The tasks
table is created with Column("user_id", Integer, nullable=True) at line `83`.
2. Insert a task row referencing an existing user id: run SQL INSERT INTO
tasks (uuid,
task_key, task_type, status, dedup_key, user_id) VALUES (..., user_id=42)
against the
database. This exercise touches the table schema created at
`create_table(...)` in the
same file (`lines ~65-86`).
3. Delete that user from the users table (ab_user): e.g. DELETE FROM ab_user
WHERE id =
42. Because there is no foreign key on tasks.user_id, the tasks row is not
updated or
removed.
4. Observe an orphaned tasks.user_id value (stale reference) when querying
tasks (SELECT
user_id FROM tasks WHERE uuid = ...). Note: other user-related columns
(created_by_fk/changed_by_fk) have FKs created via create_fks_for_table
(lines `99`-`115`)
with ondelete="SET NULL", demonstrating inconsistent constraints vs user_id.
Explanation: The current pattern intentionally creates FKs for
created_by_fk/changed_by_fk
(see `create_fks_for_table` calls at lines `99`-`115`) but omits a FK for
`user_id`, so
deleting users leaves orphaned user_id values. This reproduces
deterministically using the
migration-created schema and basic SQL operations.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/migrations/versions/2025_12_18_0220_create_tasks_table.py
**Line:** 116:116
**Comment:**
*Logic Error: The `user_id` column on the tasks table is defined as an
integer without a foreign key constraint, unlike other user-related columns,
which can lead to orphaned task records when users are deleted or IDs change
and makes data integrity inconsistent with `task_subscribers.user_id`.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
superset/models/tasks.py:
##########
@@ -0,0 +1,370 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task model for Global Task Framework (GTF)"""
+
+from __future__ import annotations
+
+import uuid
+from datetime import datetime, timezone
+from typing import Any, cast
+
+from flask_appbuilder import Model
+from sqlalchemy import (
+ Column,
+ DateTime,
+ Integer,
+ String,
+ Text,
+)
+from sqlalchemy.orm import relationship
+from superset_core.api.models import Task as CoreTask
+from superset_core.api.tasks import TaskProperties, TaskStatus
+
+from superset.models.helpers import AuditMixinNullable
+from superset.models.task_subscribers import TaskSubscriber
+from superset.tasks.utils import (
+ error_update,
+ get_finished_dedup_key,
+ parse_properties,
+ serialize_properties,
+)
+from superset.utils import json
+
+
+class Task(CoreTask, AuditMixinNullable, Model):
+ """
+ Concrete Task model for the Global Task Framework (GTF).
+
+ This model represents async tasks in Superset, providing unified tracking
+ for all background operations including SQL queries, thumbnail generation,
+ reports, and other async operations.
+
+ Non-filterable fields (progress, error info, execution config) are stored
+ in a `properties` JSON blob for schema flexibility.
+ """
+
+ __tablename__ = "tasks"
+
+ # Primary key and identifiers
+ id = Column(Integer, primary_key=True)
+ uuid = Column(
+ String(36), nullable=False, unique=True, default=lambda:
str(uuid.uuid4())
+ )
+
+ # Task metadata (filterable)
+ task_key = Column(String(256), nullable=False, index=True) # For
deduplication
+ task_type = Column(String(100), nullable=False, index=True) # e.g.,
'sql_execution'
+ task_name = Column(String(256), nullable=True) # Human readable name
+ scope = Column(
+ String(20), nullable=False, index=True, default="private"
+ ) # private/shared/system
+ status = Column(
+ String(50), nullable=False, index=True,
default=TaskStatus.PENDING.value
+ )
+ dedup_key = Column(
+ String(64), nullable=False, unique=True, index=True
+ ) # Hashed deduplication key (SHA-256 = 64 chars, UUID = 36 chars)
+
+ # Timestamps
+ started_at = Column(DateTime, nullable=True)
+ ended_at = Column(DateTime, nullable=True)
+
+ # User context for execution
+ user_id = Column(Integer, nullable=True)
+
+ # Task-specific output data (set by task code via
ctx.update_task(payload=...))
+ payload = Column(Text, nullable=True, default="{}")
+
+ # Properties JSON blob - contains runtime state and execution config:
+ # - is_abortable: bool - has abort handler registered
+ # - progress_percent: float - progress 0.0-1.0
+ # - progress_current: int - current iteration count
+ # - progress_total: int - total iterations
+ # - error_message: str - human-readable error message
+ # - exception_type: str - exception class name
+ # - stack_trace: str - full formatted traceback
+ # - timeout: int - timeout in seconds
+ _properties = Column("properties", Text, nullable=True, default="{}")
+
+ # Transient cache for parsed properties (not persisted)
+ _properties_cache: TaskProperties | None = None
+
+ # Relationships
+ subscribers = relationship(
+ TaskSubscriber,
+ back_populates="task",
+ cascade="all, delete-orphan",
+ )
+
+ def __repr__(self) -> str:
+ return f"<Task {self.task_type}:{self.task_key} [{self.status}]>"
+
+ # -------------------------------------------------------------------------
+ # Properties accessor
+ # -------------------------------------------------------------------------
+
+ @property
+ def properties(self) -> TaskProperties:
+ """
+ Get typed properties (cached for performance).
+
+ Properties contain runtime state and execution config that doesn't
+ need database filtering. Parsed once and cached until next write.
+
+ Always use .get() for reads since keys may be absent.
+
+ :returns: TaskProperties dict (sparse - only contains keys that were
set)
+ """
+ if self._properties_cache is None:
+ self._properties_cache = parse_properties(self._properties)
+ return self._properties_cache
Review Comment:
**Suggestion:** The `properties` accessor returns the internal cached
dictionary directly, so any caller that mutates the returned dict will update
the in-memory cache but not the serialized `_properties` column, leading to
confusing, inconsistent state where changes appear to "stick" on the object but
are never persisted to the database unless `update_properties` is explicitly
called. [logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ In-memory Task properties differ from persisted DB column.
- ⚠️ Task updater code may rely on persisted properties.
- ⚠️ Tests can observe flaky state when mutating properties.
```
</details>
```suggestion
# Return a copy so external mutation doesn't desynchronize cache and
DB value
return cast(TaskProperties, dict(self._properties_cache))
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. In a Python REPL or unit test import Task from superset.models.tasks and
create/persist
a Task instance (e.g., instantiate Task(...) and commit via SQLAlchemy). The
properties
accessor implementation is at superset/models/tasks.py:120-134.
2. Load the persisted Task (SQLAlchemy query) and call the accessor at
superset/models/tasks.py:121-134:
props = task.properties # returns the internal cached dict (line 133-134)
3. Mutate the returned dict in place:
props["new_key"] = "value"
4. Commit the SQLAlchemy session and inspect task._properties (the DB-backed
column).
Because only update_properties (superset/models/tasks.py:136-151) calls
serialize_properties (superset/tasks/utils.py:309-316) to write to
_properties, the
in-memory mutation will not be serialized or persisted. The object will show
the change in
memory but the database column (and future new object loads) will not
reflect it.
5. Observation: direct mutation appears to "stick" on the in-memory Task
instance, but is
not persisted unless update_properties(...)
(superset/models/tasks.py:136-151) or an
explicit write path is used.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/models/tasks.py
**Line:** 134:134
**Comment:**
*Logic Error: The `properties` accessor returns the internal cached
dictionary directly, so any caller that mutates the returned dict will update
the in-memory cache but not the serialized `_properties` column, leading to
confusing, inconsistent state where changes appear to "stick" on the object but
are never persisted to the database unless `update_properties` is explicitly
called.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
tests/integration_tests/tasks/api_tests.py:
##########
@@ -0,0 +1,611 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Integration tests for Task REST API"""
+
+from contextlib import contextmanager
+from typing import Generator
+
+import prison
+from superset_core.api.tasks import TaskStatus
+
+from superset import db
+from superset.models.tasks import Task
+from superset.utils import json
+from tests.integration_tests.base_tests import SupersetTestCase
+from tests.integration_tests.constants import (
+ ADMIN_USERNAME,
+ GAMMA_USERNAME,
+)
+
+
+class TestTaskApi(SupersetTestCase):
+ """Tests for Task REST API"""
+
+ TASK_API_BASE = "api/v1/task"
Review Comment:
**Suggestion:** The tests call `self.get_user(...)` assuming it is provided
by `SupersetTestCase`, but `get_user` is actually a module-level helper in
`tests.integration_tests.base_tests`, so at runtime `TestTaskApi` instances
will raise `AttributeError` when trying to access `self.get_user`; you can fix
this by importing the helper and adding a wrapper method on the test class.
[possible bug]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ Integration tests tasks API fail with AttributeError.
- ⚠️ CI job running integration tests will be blocked.
- ⚠️ Test coverage for Global Task Framework missing.
```
</details>
```suggestion
from tests.integration_tests.base_tests import SupersetTestCase, get_user as
base_get_user
from tests.integration_tests.constants import (
ADMIN_USERNAME,
GAMMA_USERNAME,
)
class TestTaskApi(SupersetTestCase):
"""Tests for Task REST API"""
TASK_API_BASE = "api/v1/task"
def get_user(self, username: str):
"""
Convenience wrapper to use the shared get_user helper in tests.
"""
return base_get_user(username)
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Run any test in this file, e.g. `pytest
tests/integration_tests/tasks/api_tests.py::TestTaskApi::test_get_task_by_id`.
2. Test execution enters TestTaskApi._create_tasks
(tests/integration_tests/tasks/api_tests.py:41) and executes the line `admin
=
self.get_user("admin")` (tests/integration_tests/tasks/api_tests.py:56).
3. Python looks up `get_user` on the TestTaskApi instance (inherited from
SupersetTestCase). SupersetTestCase does not define a `get_user` method (see
tests/integration_tests/base_tests.py); instead a module-level helper
`get_user(username:
str)` exists at tests/integration_tests/base_tests.py:256-262.
4. Because `self.get_user` is missing, the call raises AttributeError and
the test fails
immediately with a traceback pointing to
tests/integration_tests/tasks/api_tests.py:56.
The proposed wrapper (importing base-level get_user and adding an instance
method)
prevents this.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** tests/integration_tests/tasks/api_tests.py
**Line:** 28:38
**Comment:**
*Possible Bug: The tests call `self.get_user(...)` assuming it is
provided by `SupersetTestCase`, but `get_user` is actually a module-level
helper in `tests.integration_tests.base_tests`, so at runtime `TestTaskApi`
instances will raise `AttributeError` when trying to access `self.get_user`;
you can fix this by importing the helper and adding a wrapper method on the
test class.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
tests/unit_tests/tasks/test_utils.py:
##########
@@ -330,3 +340,248 @@ def test_get_executor(
)
assert executor_type == expected_executor_type
assert executor == expected_executor
+
+
[email protected](
+ "scope,task_type,task_key,user_id,expected_composite_key",
+ [
+ # Private tasks with TaskScope enum
+ (
+ TaskScope.PRIVATE,
+ "sql_execution",
+ "chart_123",
+ 42,
+ "private|sql_execution|chart_123|42",
+ ),
+ (
+ TaskScope.PRIVATE,
+ "thumbnail_gen",
+ "dash_456",
+ 100,
+ "private|thumbnail_gen|dash_456|100",
+ ),
+ # Private tasks with string scope
+ (
+ "private",
+ "api_call",
+ "endpoint_789",
+ 200,
+ "private|api_call|endpoint_789|200",
+ ),
+ # Shared tasks with TaskScope enum
+ (
+ TaskScope.SHARED,
+ "report_gen",
+ "monthly_report",
+ None,
+ "shared|report_gen|monthly_report",
+ ),
+ (
+ TaskScope.SHARED,
+ "export_csv",
+ "large_export",
+ 999, # user_id should be ignored for shared
+ "shared|export_csv|large_export",
+ ),
+ # Shared tasks with string scope
+ (
+ "shared",
+ "batch_process",
+ "batch_001",
+ 123, # user_id should be ignored for shared
+ "shared|batch_process|batch_001",
+ ),
+ # System tasks with TaskScope enum
+ (
+ TaskScope.SYSTEM,
+ "cleanup_task",
+ "daily_cleanup",
+ None,
+ "system|cleanup_task|daily_cleanup",
+ ),
+ (
+ TaskScope.SYSTEM,
+ "db_migration",
+ "version_123",
+ 1, # user_id should be ignored for system
+ "system|db_migration|version_123",
+ ),
+ # System tasks with string scope
+ (
+ "system",
+ "maintenance",
+ "nightly_job",
+ 2, # user_id should be ignored for system
+ "system|maintenance|nightly_job",
+ ),
+ ],
+)
+def test_get_active_dedup_key(
+ scope, task_type, task_key, user_id, expected_composite_key, mocker,
app_context
+):
+ """Test get_active_dedup_key generates a hash of the composite key.
+
+ The function hashes the composite key using the configured HASH_ALGORITHM
+ to produce a fixed-length dedup_key for database storage. The result is
+ truncated to 64 chars to fit the database column.
+ """
+ # Mock get_user_id to return the specified user_id
+ mocker.patch("superset.utils.core.get_user_id", return_value=user_id)
Review Comment:
**Suggestion:** The test mocks `get_user_id` on the `superset.utils.core`
module, but the function under test `get_active_dedup_key` is imported from
`superset.tasks.utils`, so if that module imported `get_user_id` directly (e.g.
`from superset.utils.core import get_user_id`), this patch will not affect the
function call and the test will read the real user ID instead of the mocked
one, making the test brittle or incorrect depending on the environment.
[possible bug]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Unit test for dedup key may be flaky in CI.
- ⚠️ tests/unit_tests/tasks/test_utils.py affected (get_active_dedup_key).
```
</details>
```suggestion
mocker.patch("superset.tasks.utils.get_user_id", return_value=user_id)
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Open the test at tests/unit_tests/tasks/test_utils.py:419 (definition of
test_get_active_dedup_key). Note the mock is applied at lines 428-429.
2. Inspect superset/tasks/utils.py to determine how get_user_id is
referenced. If that
module did `from superset.utils.core import get_user_id` then calls within
tasks.utils
resolve to superset.tasks.utils.get_user_id (check top-of-file imports in
superset/tasks/utils.py).
3. Run pytest for this single test: pytest
tests/unit_tests/tasks/test_utils.py::test_get_active_dedup_key - observe
behavior. The
test patches superset.utils.core.get_user_id
(tests/unit_tests/tasks/test_utils.py:429)
but not superset.tasks.utils.get_user_id.
4. If tasks.utils imported get_user_id into its own namespace,
get_active_dedup_key will
call the unpatched local reference and the test may observe the real user id
(causing
wrong hash or test failure). If tasks.utils uses
superset.utils.core.get_user_id at call
time (no direct import), the existing patch will succeed. This reproduces
the brittle
behavior and identifies whether the patch target is incorrect.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** tests/unit_tests/tasks/test_utils.py
**Line:** 429:429
**Comment:**
*Possible Bug: The test mocks `get_user_id` on the
`superset.utils.core` module, but the function under test
`get_active_dedup_key` is imported from `superset.tasks.utils`, so if that
module imported `get_user_id` directly (e.g. `from superset.utils.core import
get_user_id`), this patch will not affect the function call and the test will
read the real user ID instead of the mocked one, making the test brittle or
incorrect depending on the environment.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
##########
superset/daos/tasks.py:
##########
@@ -0,0 +1,309 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task DAO for Global Task Framework (GTF)"""
+
+import logging
+from datetime import datetime, timezone
+from typing import Any
+
+from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus
+
+from superset.daos.base import BaseDAO
+from superset.daos.exceptions import DAODeleteFailedError
+from superset.extensions import db
+from superset.models.task_subscribers import TaskSubscriber
+from superset.models.tasks import Task
+from superset.tasks.constants import ABORTABLE_STATES
+from superset.tasks.filters import TaskFilter
+from superset.tasks.utils import get_active_dedup_key
+
+logger = logging.getLogger(__name__)
+
+
+class TaskDAO(BaseDAO[Task]):
+ """
+ Concrete TaskDAO for the Global Task Framework (GTF).
+
+ Provides database access operations for async tasks including
+ creation, status management, filtering, and subscription management
+ for shared tasks.
+ """
+
+ base_filter = TaskFilter
+
+ @classmethod
+ def find_by_task_key(
+ cls,
+ task_type: str,
+ task_key: str,
+ scope: TaskScope | str = TaskScope.PRIVATE,
+ user_id: int | None = None,
+ ) -> Task | None:
+ """
+ Find active task by type, key, scope, and user.
+
+ Uses dedup_key internally for efficient querying with a unique index.
+ Only returns tasks that are active (pending or in progress).
+
+ Uniqueness logic by scope:
+ - private: scope + task_type + task_key + user_id
+ - shared/system: scope + task_type + task_key (user-agnostic)
+
+ :param task_type: Task type to filter by
+ :param task_key: Task identifier for deduplication
+ :param scope: Task scope (private/shared/system)
+ :param user_id: User ID (required for private tasks)
+ :returns: Task instance or None if not found or not active
+ """
+ dedup_key = get_active_dedup_key(
+ scope=scope,
+ task_type=task_type,
+ task_key=task_key,
+ user_id=user_id,
+ )
+
+ # Simple single-column query with unique index
+ return db.session.query(Task).filter(Task.dedup_key ==
dedup_key).one_or_none()
+
+ @classmethod
+ def create_task(
+ cls,
+ task_type: str,
+ task_key: str,
+ scope: TaskScope | str = TaskScope.PRIVATE,
+ user_id: int | None = None,
+ payload: dict[str, Any] | None = None,
+ properties: TaskProperties | None = None,
+ **kwargs: Any,
+ ) -> Task:
+ """
+ Create a new task record in the database.
+
+ This is a pure data operation - assumes caller holds lock and has
+ already checked for existing tasks. Business logic (create vs join)
+ is handled by SubmitTaskCommand.
+
+ :param task_type: Type of task to create
+ :param task_key: Task identifier (required)
+ :param scope: Task scope (private/shared/system), defaults to private
+ :param user_id: User ID creating the task
+ :param payload: Optional user-defined context data (dict)
+ :param properties: Optional framework-managed runtime state (e.g.,
timeout)
+ :param kwargs: Additional task attributes (e.g., task_name)
+ :returns: Created Task instance
+ """
+ # Build dedup_key for active task
+ dedup_key = get_active_dedup_key(
+ scope=scope,
+ task_type=task_type,
+ task_key=task_key,
+ user_id=user_id,
+ )
+
+ # Handle both TaskScope enum and string values
+ scope_value = scope.value if isinstance(scope, TaskScope) else scope
+
+ # Note: properties is handled separately via update_properties()
+ # because it's a hybrid property with only a getter
+ task_data = {
+ "task_type": task_type,
+ "task_key": task_key,
+ "scope": scope_value,
+ "status": TaskStatus.PENDING.value,
+ "dedup_key": dedup_key,
+ **kwargs,
+ }
+
+ # Handle payload - serialize to JSON if dict provided
+ if payload:
+ from superset.utils import json
+
+ task_data["payload"] = json.dumps(payload)
+
+ if user_id is not None:
+ task_data["user_id"] = user_id
+
+ task = cls.create(attributes=task_data)
+
+ # Set properties after creation (hybrid property with getter only)
+ if properties:
+ task.update_properties(properties)
+
+ # Flush to get the task ID (auto-incremented primary key)
+ db.session.flush()
+
+ # Auto-subscribe creator for all tasks
+ # This enables consistent subscriber display across all task types
+ if user_id:
+ cls.add_subscriber(task.id, user_id)
+ logger.info(
+ "Creator %s auto-subscribed to task: %s (scope: %s)",
+ user_id,
+ task_key,
+ scope_value,
+ )
+
+ logger.info(
+ "Created new async task: %s (type: %s, scope: %s)",
+ task_key,
+ task_type,
+ scope_value,
+ )
+ return task
+
+ @classmethod
+ def abort_task(cls, task_uuid: str, skip_base_filter: bool = False) ->
Task | None:
+ """
+ Abort a task by UUID.
+
+ This is a pure data operation. Business logic (subscriber count checks,
+ permission validation) is handled by CancelTaskCommand which holds the
lock.
+
+ Abort behavior by status:
+ - PENDING: Goes directly to ABORTED (always abortable)
+ - IN_PROGRESS with is_abortable=True: Goes to ABORTING
+ - IN_PROGRESS with is_abortable=False/None: Raises
TaskNotAbortableError
+ - ABORTING: Returns task (idempotent)
+ - Finished statuses: Returns None
+
+ Note: Caller is responsible for calling TaskManager.publish_abort()
AFTER
+ the transaction commits if task.status == ABORTING. This prevents race
+ conditions where listeners check the DB before the status is visible.
+
+ :param task_uuid: UUID of task to abort
+ :param skip_base_filter: If True, skip base filter (for admin
abortions)
+ :returns: Task if aborted/aborting, None if not found or already
finished
+ :raises TaskNotAbortableError: If in-progress task has no abort handler
+ """
+ from superset.commands.tasks.exceptions import TaskNotAbortableError
+
+ task = cls.find_one_or_none(skip_base_filter=skip_base_filter,
uuid=task_uuid)
+ if not task:
+ return None
+
+ # Already aborting - idempotent success
+ if task.status == TaskStatus.ABORTING.value:
+ logger.info("Task %s is already aborting", task_uuid)
+ return task
+
+ # Already finished - cannot abort
+ if task.status not in ABORTABLE_STATES:
+ return None
+
+ # PENDING: Go directly to ABORTED
+ if task.status == TaskStatus.PENDING.value:
+ task.set_status(TaskStatus.ABORTED)
+ logger.info("Aborted pending task: %s (scope: %s)", task_uuid,
task.scope)
+ return task
+
+ # IN_PROGRESS: Check if abortable
+ if task.status == TaskStatus.IN_PROGRESS.value:
+ if task.properties.get("is_abortable") is not True:
+ raise TaskNotAbortableError(
+ f"Task {task_uuid} is in progress but has not registered "
+ "an abort handler (is_abortable is not true)"
+ )
+
+ # Transition to ABORTING (not ABORTED yet)
+ task.status = TaskStatus.ABORTING.value
+ db.session.merge(task)
+ logger.info("Set task %s to ABORTING (scope: %s)", task_uuid,
task.scope)
+
+ # NOTE: publish_abort is NOT called here - caller handles it after
commit
+ # This prevents race conditions where listeners check DB before
commit
+
+ return task
+
+ return None
+
+ # Subscription management methods
+
+ @classmethod
+ def add_subscriber(cls, task_id: int, user_id: int) -> bool:
+ """
+ Add a user as a subscriber to a task.
+
+ :param task_id: ID of the task
+ :param user_id: ID of the user to subscribe
+ :returns: True if subscriber was added, False if already exists
+ """
+ # Check first to avoid IntegrityError which invalidates the session
+ # in nested transaction contexts (IntegrityError can't be recovered
from)
+ existing = (
+ db.session.query(TaskSubscriber)
+ .filter_by(task_id=task_id, user_id=user_id)
+ .first()
+ )
+ if existing:
+ logger.debug(
+ "Subscriber %s already subscribed to task %s", user_id, task_id
+ )
+ return False
+
+ subscription = TaskSubscriber(
+ task_id=task_id,
+ user_id=user_id,
+ subscribed_at=datetime.now(timezone.utc),
+ )
+ db.session.add(subscription)
+ db.session.flush()
Review Comment:
**Suggestion:** The subscriber addition logic checks for existence before
inserting, but under concurrent calls a unique constraint violation can still
occur on flush, raising an IntegrityError that leaves the session in a failed
state and breaks the transaction. The fix is to catch IntegrityError around
flush (after the add) and treat it as an "already subscribed" case while
rolling back the failed operation to keep the session usable. [race condition]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ❌ Concurrent subscription attempts can cause session failure.
- ⚠️ Task creation auto-subscribe may trigger IntegrityError.
- ⚠️ Unhandled DB error surfaces as 500 responses.
```
</details>
```suggestion
from sqlalchemy.exc import IntegrityError
# Check first to avoid IntegrityError which invalidates the session
# in nested transaction contexts (IntegrityError can't be recovered
from)
existing = (
db.session.query(TaskSubscriber)
.filter_by(task_id=task_id, user_id=user_id)
.first()
)
if existing:
logger.debug(
"Subscriber %s already subscribed to task %s", user_id,
task_id
)
return False
subscription = TaskSubscriber(
task_id=task_id,
user_id=user_id,
subscribed_at=datetime.now(timezone.utc),
)
db.session.add(subscription)
try:
db.session.flush()
except IntegrityError:
db.session.rollback()
logger.debug(
"Subscriber %s already subscribed to task %s
(IntegrityError)",
user_id,
task_id,
)
return False
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Inspect add_subscriber implementation at superset/daos/tasks.py:236-265 —
it checks
existence then adds and flushes without handling IntegrityError.
2. Trigger two concurrent subscribe attempts for the same (task_id, user_id)
pair (for
example: simultaneous HTTP requests or concurrent workers calling
TaskDAO.add_subscriber
during task creation/auto-subscription at superset/daos/tasks.py:140
create_task ->
add_subscriber).
3. Both processes call the existence check; both see no existing row, both
call
db.session.add(subscription). The first flush succeeds; the second flush
raises
sqlalchemy.exc.IntegrityError due to unique constraint.
4. Observe the second request leaves db.session in a failed state (unhandled
IntegrityError), causing transaction/session failure and likely a 500
response; this is
reproducible from the function location and realistic under concurrent usage.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/daos/tasks.py
**Line:** 245:263
**Comment:**
*Race Condition: The subscriber addition logic checks for existence
before inserting, but under concurrent calls a unique constraint violation can
still occur on flush, raising an IntegrityError that leaves the session in a
failed state and breaks the transaction. The fix is to catch IntegrityError
around flush (after the add) and treat it as an "already subscribed" case while
rolling back the failed operation to keep the session usable.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]