codeant-ai-for-open-source[bot] commented on code in PR #36368:
URL: https://github.com/apache/superset/pull/36368#discussion_r2722804849


##########
superset/commands/tasks/cancel.py:
##########
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unified cancel task command for GTF."""
+
+import logging
+from functools import partial
+from typing import TYPE_CHECKING
+
+from superset_core.api.tasks import TaskScope, TaskStatus
+
+from superset.commands.base import BaseCommand
+from superset.commands.tasks.exceptions import (
+    TaskAbortFailedError,
+    TaskNotAbortableError,
+    TaskNotFoundError,
+    TaskPermissionDeniedError,
+)
+from superset.utils.core import get_user_id
+from superset.utils.decorators import on_error, transaction
+
+if TYPE_CHECKING:
+    from superset.models.tasks import Task
+
+logger = logging.getLogger(__name__)
+
+
+class CancelTaskCommand(BaseCommand):
+    """
+    Unified command to cancel a task.
+
+    Behavior:
+    - For private tasks or single-subscriber tasks: aborts the task
+    - For shared tasks with multiple subscribers (non-admin): unsubscribes user
+    - For shared tasks with force=True (admin only): aborts for all subscribers
+
+    The term "cancel" is user-facing; internally this may abort or unsubscribe.
+    """
+
+    def __init__(self, task_uuid: str, force: bool = False):
+        """
+        Initialize the cancel command.
+
+        :param task_uuid: UUID of the task to cancel
+        :param force: If True, force abort even with multiple subscribers 
(admin only)
+        """
+        self._task_uuid = task_uuid
+        self._force = force
+        self._model: "Task" | None = None
+        self._is_admin: bool = False
+        self._action_taken: str = (
+            "cancelled"  # Will be set to 'aborted' or 'unsubscribed'
+        )
+
+    @transaction(on_error=partial(on_error, reraise=TaskAbortFailedError))
+    def run(self) -> "Task":
+        """
+        Execute the cancel command.
+
+        :returns: The updated task model
+        """
+        self.validate()
+        assert self._model
+
+        # Lazy import to avoid circular dependency
+        from superset.daos.tasks import TaskDAO
+
+        user_id = get_user_id()
+
+        # Determine action based on task scope and force flag
+        should_abort_directly = (
+            # Admin with force flag always aborts
+            (self._is_admin and self._force)
+            # Private tasks always abort (only one user)
+            or self._model.is_private
+            # System tasks always abort (admin only anyway)
+            or self._model.is_system
+            # Single or last subscriber - abort
+            or self._model.subscriber_count <= 1
+        )
+
+        if should_abort_directly:
+            # Direct abort
+            self._action_taken = "aborted"
+            try:
+                TaskDAO.abort_task(self._task_uuid, 
skip_base_filter=self._is_admin)
+            except TaskNotAbortableError:
+                raise
+

Review Comment:
   **Suggestion:** The code sets `_action_taken = "aborted"` before calling 
`TaskDAO.abort_task` and does not check the DAO's boolean return value; 
`abort_task` can return False (not aborted) without raising, so the command 
will incorrectly report the task as aborted. Capture the return value, handle 
False by raising `TaskAbortFailedError`, and only set `_action_taken` after a 
confirmed abort. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Consumers receive false aborted confirmations.
   - ⚠️ UI may show wrong task status to users.
   ```
   </details>
   
   ```suggestion
               try:
                   aborted = TaskDAO.abort_task(self._task_uuid, 
skip_base_filter=self._is_admin)
               except TaskNotAbortableError:
                   raise
   
               if not aborted:
                   # Abort failed (task finished or DAO refused to abort); 
surface as abort failure
                   raise TaskAbortFailedError(f"Failed to abort task 
{self._task_uuid}")
   
               self._action_taken = "aborted"
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Prepare a task that will cause TaskDAO.abort_task(...) to return False:
   
      - Example: a shared task with subscriber_count == 1, is_shared == True, 
and status ==
      TaskStatus.PENDING.value.
   
      - The DAO's early-return logic (superset/daos/tasks.py:200-211) logs and 
returns False
      when `task.is_shared and not skip_base_filter and task.subscriber_count > 
0`.
   
   2. As the subscribing non-admin user, call the cancellation flow:
   
      - CancelTaskCommand('<TEST_UUID>').run() 
(superset/commands/tasks/cancel.py:69-133).
   
   3. The command evaluates should_abort_directly True
   (superset/commands/tasks/cancel.py:83-93), sets self._action_taken = 
"aborted" at
   superset/commands/tasks/cancel.py:95-99, and calls TaskDAO.abort_task(...) at
   superset/commands/tasks/cancel.py:98-100.
   
   4. TaskDAO.abort_task(...) returns False (superset/daos/tasks.py:200-211) 
but does not
   raise. Because the command code does not check the boolean return, the 
command continues
   believing the abort succeeded and will return. Consumers reading
   CancelTaskCommand.action_taken or the API response will see "aborted" even 
though DAO did
   not abort the task.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/commands/tasks/cancel.py
   **Line:** 97:102
   **Comment:**
        *Logic Error: The code sets `_action_taken = "aborted"` before calling 
`TaskDAO.abort_task` and does not check the DAO's boolean return value; 
`abort_task` can return False (not aborted) without raising, so the command 
will incorrectly report the task as aborted. Capture the return value, handle 
False by raising `TaskAbortFailedError`, and only set `_action_taken` after a 
confirmed abort.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset-frontend/src/features/tasks/timeUtils.test.ts:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import {
+  formatDuration,
+  calculateEta,
+  formatProgressTooltip,
+} from './timeUtils';
+
+test('formatDuration returns null for invalid inputs', () => {
+  expect(formatDuration(null)).toBeNull();
+  expect(formatDuration(undefined)).toBeNull();
+  expect(formatDuration(0)).toBeNull();
+  expect(formatDuration(-5)).toBeNull();
+});
+
+test('formatDuration formats seconds correctly', () => {
+  expect(formatDuration(37.5)).toBe('37s');

Review Comment:
   **Suggestion:** The test expects formatDuration(37.5) to be '37s' but the 
underlying pretty-ms formatting rounds 37.5s to '38s' (half-up rounding). 
Update the expectation to match the library rounding so the test reflects 
actual runtime output. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Unit test fails blocking CI merges.
   - ⚠️ Affects timeUtils test suite execution.
   - ⚠️ Prevents related PR from being green.
   ```
   </details>
   
   ```suggestion
     // pretty-ms rounds 37.5s to '38s' (half-up rounding), assert accordingly
     expect(formatDuration(37.5)).toBe('38s');
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Run the test file: execute Jest for
   superset-frontend/src/features/tasks/timeUtils.test.ts (the test suite that 
contains the
   assertion at superset-frontend/src/features/tasks/timeUtils.test.ts:34).
   
   2. The test at timeUtils.test.ts:34 calls formatDuration(37.5). This calls 
the
   implementation export function formatDuration in
   superset-frontend/src/features/tasks/timeUtils.ts (function defined starting 
at
   timeUtils.ts:34).
   
   3. formatDuration multiplies seconds by 1000 and delegates to prettyMs with
   secondsDecimalDigits: 0 (timeUtils.ts:34-46). For 37.5 seconds pretty-ms 
performs half-up
   rounding and returns "38s" at runtime.
   
   4. The assertion at timeUtils.test.ts:34 expects "37s", causing a 
deterministic test
   failure ("Expected: '37s' — Received: '38s'") when running the test locally 
or on CI,
   failing the unit test and CI pipeline.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset-frontend/src/features/tasks/timeUtils.test.ts
   **Line:** 34:34
   **Comment:**
        *Logic Error: The test expects formatDuration(37.5) to be '37s' but the 
underlying pretty-ms formatting rounds 37.5s to '38s' (half-up rounding). 
Update the expectation to match the library rounding so the test reflects 
actual runtime output.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/daos/tasks.py:
##########
@@ -0,0 +1,396 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task DAO for Global Task Framework (GTF)"""
+
+import logging
+import uuid
+from datetime import datetime
+from typing import Any
+
+from superset_core.api.tasks import TaskScope, TaskStatus
+
+from superset.daos.base import BaseDAO
+from superset.daos.exceptions import DAOCreateFailedError, DAODeleteFailedError
+from superset.extensions import db
+from superset.models.task_subscribers import TaskSubscriber
+from superset.models.tasks import Task
+from superset.tasks.filters import TaskFilter
+from superset.tasks.utils import get_active_dedup_key
+from superset.utils.core import get_user_id
+from superset.utils.decorators import transaction
+
+logger = logging.getLogger(__name__)
+
+
+class TaskDAO(BaseDAO[Task]):
+    """
+    Concrete TaskDAO for the Global Task Framework (GTF).
+
+    Provides database access operations for async tasks including
+    creation, status management, filtering, and subscription management
+    for shared tasks.
+    """
+
+    base_filter = TaskFilter
+
+    @classmethod
+    def find_by_task_key(
+        cls,
+        task_type: str,
+        task_key: str,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+    ) -> Task | None:
+        """
+        Find active task by type, key, scope, and user.
+
+        Uses dedup_key internally for efficient querying with a unique index.
+        Only returns tasks that are active (pending or in progress).
+
+        Uniqueness logic by scope:
+        - private: scope + task_type + task_key + user_id
+        - shared/system: scope + task_type + task_key (user-agnostic)
+
+        :param task_type: Task type to filter by
+        :param task_key: Task identifier for deduplication
+        :param scope: Task scope (private/shared/system)
+        :param user_id: User ID (required for private tasks)
+        :returns: Task instance or None if not found or not active
+        """
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=user_id,
+        )
+
+        # Simple single-column query with unique index
+        return db.session.query(Task).filter(Task.dedup_key == 
dedup_key).one_or_none()
+
+    @classmethod
+    @transaction()
+    def create_task(
+        cls,
+        task_type: str,
+        task_key: str | None = None,
+        scope: TaskScope | str = TaskScope.PRIVATE,
+        user_id: int | None = None,
+        **kwargs: Any,
+    ) -> Task:
+        """
+        Create a new async task with scope-aware deduplication.
+
+        For shared tasks, if a task with the same parameters already exists,
+        the current user is subscribed to it instead of creating a duplicate.
+
+        :param task_type: Type of task to create
+        :param task_key: Optional task identifier for deduplication
+        :param scope: Task scope (private/shared/system), defaults to private
+        :param user_id: User ID creating the task (required for subscription)
+        :param kwargs: Additional task attributes
+        :returns: Created or existing Task instance
+        :raises DAOCreateFailedError: If duplicate private task exists
+        """
+        from superset_core.api.tasks import TaskScope
+
+        from superset.tasks.utils import get_active_dedup_key
+
+        # Generate task_key if not provided
+        if task_key is None:
+            task_key = str(uuid.uuid4())
+
+        # Determine user_id early: use provided value or fall back to current 
user
+        effective_user_id = user_id if user_id is not None else get_user_id()
+
+        # Build dedup_key for active task
+        dedup_key = get_active_dedup_key(
+            scope=scope,
+            task_type=task_type,
+            task_key=task_key,
+            user_id=effective_user_id,
+        )
+
+        # Check for existing active task using dedup_key
+        if existing := cls.find_by_task_key(task_type, task_key, scope, 
user_id):
+            # For shared tasks, subscribe user to existing task
+            if (
+                scope == TaskScope.SHARED
+                and user_id
+                and not existing.has_subscriber(user_id)
+            ):
+                cls.add_subscriber(existing.id, user_id)
+                logger.info(
+                    "User %s subscribed to existing shared task: %s",
+                    user_id,

Review Comment:
   **Suggestion:** Logic bug: `create_task` calls `find_by_task_key(..., 
user_id)` with the original `user_id` parameter instead of the resolved 
`effective_user_id`. When `user_id` is None (and `get_user_id()` returns a 
value) this will either raise in dedup key construction for private tasks or 
miss matching an existing task and create duplicates; use `effective_user_id` 
consistently when checking for an existing active task and when subscribing. 
[logic error]
   
   <details>
   <summary><b>Severity Level:</b> Critical 🚨</summary>
   
   ```mdx
   - ❌ Task creation deduplication may create duplicate tasks.
   - ⚠️ Shared-task subscription logic may not subscribe user.
   - ⚠️ Unit tests for task deduplication may intermittently fail.
   ```
   </details>
   
   ```suggestion
           if existing := cls.find_by_task_key(task_type, task_key, scope, 
effective_user_id):
               # For shared tasks, subscribe user to existing task
               if (
                   scope == TaskScope.SHARED
                   and effective_user_id
                   and not existing.has_subscriber(effective_user_id)
               ):
                   cls.add_subscriber(existing.id, effective_user_id)
                   logger.info(
                       "User %s subscribed to existing shared task: %s",
                       effective_user_id,
   ```
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. In the running codebase, call TaskDAO.create_task defined at 
superset/daos/tasks.py:86
   (the method starting at the @transaction decorator) without passing a 
user_id argument
   (i.e., user_id=None). At superset/daos/tasks.py:116 the code resolves 
effective_user_id =
   user_id if user_id is not None else get_user_id() 
(superset/utils/core.py:1392-1407
   implements get_user_id()).
   
   2. After effective_user_id is computed (superset/daos/tasks.py:116), the 
code builds a
   dedup_key using that effective_user_id (superset/daos/tasks.py:120-125 calls
   get_active_dedup_key in superset/tasks/utils.py:161-213). Immediately after, 
at
   superset/daos/tasks.py:128 the code checks for an existing active task by 
calling
   cls.find_by_task_key(..., user_id) using the original user_id parameter 
(None), not the
   resolved effective_user_id.
   
   3. This mismatch can be triggered when callers omit user_id (so create_task 
computes
   effective_user_id via get_user_id()), but the find check uses the original 
None value. The
   mismatch is concrete and observable by adding a unit test that:
   
      - Calls TaskDAO.create_task(task_type="t", task_key="k", 
scope=TaskScope.PRIVATE) from
      a Flask request context where g.user.id is set (so get_user_id() returns 
an int) and
      asserts only one Task row is created.
   
      - Then calls TaskDAO.create_task(...) again (same args) and observes a 
duplicate task
      being created instead of being detected via find_by_task_key because the 
find used the
      original user_id parameter.
   
   4. Outcome: the first call creates a task (superset/daos/tasks.py:165), 
while the second
   call may create a duplicate because find_by_task_key at 
superset/daos/tasks.py:128 used
   the wrong user identifier. If tests are run without a Flask request context 
(get_user_id()
   returns None), this code path can also cause inconsistent dedup_key checks 
or ValueError
   inside superset/tasks/utils.py:161-213. The suggested change to use 
effective_user_id at
   superset/daos/tasks.py:128 reproduces and prevents the mismatch.
   ```
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/daos/tasks.py
   **Line:** 129:138
   **Comment:**
        *Logic Error: Logic bug: `create_task` calls `find_by_task_key(..., 
user_id)` with the original `user_id` parameter instead of the resolved 
`effective_user_id`. When `user_id` is None (and `get_user_id()` returns a 
value) this will either raise in dedup key construction for private tasks or 
miss matching an existing task and create duplicates; use `effective_user_id` 
consistently when checking for an existing active task and when subscribing.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to