codeant-ai-for-open-source[bot] commented on code in PR #36368: URL: https://github.com/apache/superset/pull/36368#discussion_r2737363958
########## superset/tasks/decorators.py: ########## @@ -0,0 +1,437 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Decorators for the Global Task Framework (GTF)""" + +import inspect +import logging +from typing import Callable, cast, Generic, ParamSpec, TYPE_CHECKING, TypeVar + +from superset_core.api.tasks import TaskOptions, TaskScope, TaskStatus + +from superset.tasks.ambient_context import use_context +from superset.tasks.context import TaskContext +from superset.tasks.manager import TaskManager +from superset.tasks.registry import TaskRegistry +from superset.tasks.utils import generate_random_task_key + +if TYPE_CHECKING: + from superset.models.tasks import Task + +logger = logging.getLogger(__name__) + +P = ParamSpec("P") +R = TypeVar("R") + + +def task( + func: Callable[P, R] | None = None, + *, + name: str | None = None, + scope: TaskScope = TaskScope.PRIVATE, + timeout: int | None = None, +) -> Callable[[Callable[P, R]], "TaskWrapper[P]"] | "TaskWrapper[P]": + """ + Decorator to register a task with default scope. + + Can be used with or without parentheses: + @task + def my_func(): ... + + @task() + def my_func(): ... + + @task(name="custom_name", scope=TaskScope.SHARED) + def my_func(): ... + + @task(timeout=300) # 5-minute timeout + def long_running_func(): ... + + Args: + func: The function to decorate (when used without parentheses). + name: Optional unique task name (e.g., "superset.generate_thumbnail"). + If not provided, uses the function name as the task name. + scope: Task scope (TaskScope.PRIVATE, SHARED, or SYSTEM). + Defaults to TaskScope.PRIVATE. + timeout: Optional timeout in seconds. When the timeout is reached, + abort handlers are triggered if registered. Can be overridden + at call time via TaskOptions(timeout=...). + + Usage: + # Private task (default scope) - no parentheses + @task + def my_async_func(chart_id: int) -> None: + ctx = get_context() + ... + + # Named task with shared scope + @task(name="generate_report", scope=TaskScope.SHARED) + def generate_expensive_report(report_id: int) -> None: + ctx = get_context() + ... + + # System task (admin-only) + @task(scope=TaskScope.SYSTEM) + def cleanup_task() -> None: + ctx = get_context() + ... + + # Task with timeout + @task(timeout=300) + def long_task() -> None: + ctx = get_context() + + @ctx.on_abort + def handle_abort(): + # Called when timeout is reached or user cancels + ... + + Note: + Both direct calls and .schedule() return Task, regardless of the + original function's return type. The decorated function's return value + is discarded; only side effects and context updates matter. + """ + + def decorator(f: Callable[P, R]) -> "TaskWrapper[P]": + # Use function name if no name provided + task_name = name if name is not None else f.__name__ + + # Create default options with no scope (scope is now in decorator) + default_options = TaskOptions() + + # Validate function signature - must not have ctx or options params + sig = inspect.signature(f) + forbidden = {"ctx", "options"} + if any(param in forbidden for param in sig.parameters): + raise TypeError( + f"Task function {f.__name__} must not define 'ctx' or " + "'options' parameters. " + f"Use get_context() instead for ambient context access." + ) + + # Register task + TaskRegistry.register(task_name, f) + + # Create wrapper with schedule() method, default options, scope, and timeout + wrapper = TaskWrapper(task_name, f, default_options, scope, timeout) + + # Preserve signature for introspection + wrapper.__signature__ = sig # type: ignore[attr-defined] + + return wrapper + + if func is None: + # Called with parentheses: @task() or @task(name="foo", scope=TaskScope.SHARED) + return decorator + else: + # Called without parentheses: @task + return decorator(func) + + +class TaskWrapper(Generic[P]): + """ + Wrapper for task functions that provides .schedule() method. + + Both direct calls and .schedule() return Task. The original function's + return value is discarded. + + Direct calls execute synchronously, .schedule() runs async via Celery. + """ + + def __init__( + self, + name: str, + func: Callable[P, R], + default_options: TaskOptions, + scope: TaskScope = TaskScope.PRIVATE, + default_timeout: int | None = None, + ) -> None: + self.name = name + self.func = func + self.default_options = default_options + self.scope = scope + self.default_timeout = default_timeout + self.__name__ = func.__name__ + self.__doc__ = func.__doc__ + self.__module__ = func.__module__ + + # Patch schedule.__signature__ to mirror function + options parameter + # This enables proper IDE support and introspection + sig = inspect.signature(func) + params = list(sig.parameters.values()) + # Add keyword-only options parameter + params.append( + inspect.Parameter( + "options", + inspect.Parameter.KEYWORD_ONLY, + default=None, + annotation=TaskOptions | None, + ) + ) + self.schedule.__func__.__signature__ = sig.replace( # type: ignore[attr-defined] + parameters=params, return_annotation="Task" + ) + + def _merge_options(self, override_options: TaskOptions | None) -> TaskOptions: + """ + Merge decorator defaults with call-time overrides. + + Call-time options take precedence over decorator defaults. + For timeout, an explicit None in TaskOptions disables the decorator timeout. + + Args: + override_options: Options provided at call time, or None + + Returns: + Merged TaskOptions with overrides applied + """ + if override_options is None: + return TaskOptions( + task_key=self.default_options.task_key, + task_name=self.default_options.task_name, + timeout=self.default_timeout, # Use decorator default + ) Review Comment: **Suggestion:** Runtime TypeError: the code calls TaskOptions(...) with keyword arguments, but the upstream TaskOptions class defines only class-level attributes and does not accept constructor kwargs; this will raise TypeError at runtime. Replace the direct constructor call with creating an instance and assigning attributes. [type error] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ Synchronous task invocation crashes on no-options calls. - ❌ Task creation path (TaskWrapper.__call__) blocked. - ⚠️ Async scheduling also uses _merge_options; affected. ``` </details> ```suggestion opts = TaskOptions() opts.task_key = self.default_options.task_key opts.task_name = self.default_options.task_name opts.timeout = self.default_timeout # Use decorator default return opts ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Register a task using the decorator (decoration flow in superset/tasks/decorators.py:108-135). The decorator returns a TaskWrapper instance (created at superset/tasks/decorators.py:128-130). 2. Call the decorated function synchronously (invokes TaskWrapper.__call__ at superset/tasks/decorators.py:238). Inside __call__, override_options is extracted at superset/tasks/decorators.py:261 with: `override_options = cast(TaskOptions | None, kwargs.pop("options", None))`. 3. When the caller does not pass options (common case), override_options is None and __call__ calls self._merge_options(override_options) at superset/tasks/decorators.py:262; _merge_options executes the branch starting at superset/tasks/decorators.py:201 which attempts to call TaskOptions(...) with keyword args. 4. The upstream TaskOptions type (superset_core/src/superset_core/api/tasks.py:93-134) is a simple class with class-level attributes and no __init__ that accepts kwargs; calling TaskOptions(task_key=..., task_name=..., timeout=...) raises TypeError at runtime (e.g., "TaskOptions() takes no arguments"), causing any synchronous task invocation without explicit options to crash before task creation completes. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/tasks/decorators.py **Line:** 202:206 **Comment:** *Type Error: Runtime TypeError: the code calls TaskOptions(...) with keyword arguments, but the upstream TaskOptions class defines only class-level attributes and does not accept constructor kwargs; this will raise TypeError at runtime. Replace the direct constructor call with creating an instance and assigning attributes. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## tests/integration_tests/tasks/commands/test_update.py: ########## @@ -0,0 +1,271 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import patch + +import pytest +from superset_core.api.tasks import TaskScope, TaskStatus + +from superset import db +from superset.commands.tasks import UpdateTaskCommand +from superset.commands.tasks.exceptions import ( + TaskNotFoundError, +) +from superset.daos.tasks import TaskDAO + + +@patch("superset.tasks.utils.get_current_user") +def test_update_task_success(mock_get_user, app_context, get_user, login_as) -> None: + """Test successful task update""" + admin = get_user("admin") + login_as("admin") + mock_get_user.return_value = admin.username + + # Create a task using DAO + task = TaskDAO.create_task( + task_type="test_type", + task_key="update_test", + scope=TaskScope.PRIVATE, + user_id=admin.id, + ) + task.created_by = admin + task.set_status(TaskStatus.IN_PROGRESS) + db.session.commit() + + try: + # Update the task status + command = UpdateTaskCommand( + task_uuid=task.uuid, + status=TaskStatus.SUCCESS.value, + ) + result = command.run() + + # Verify update + assert result.uuid == task.uuid + assert result.status == TaskStatus.SUCCESS.value + + # Verify in database + db.session.refresh(task) + assert task.status == TaskStatus.SUCCESS.value + finally: + # Cleanup + db.session.delete(task) + db.session.commit() + + +def test_update_task_not_found(app_context, login_as) -> None: + """Test update fails when task not found""" + login_as("admin") + + command = UpdateTaskCommand( + task_uuid="00000000-0000-0000-0000-000000000000", + status=TaskStatus.SUCCESS.value, + ) + + with pytest.raises(TaskNotFoundError): + command.run() + + +@patch("superset.tasks.utils.get_current_user") +def test_update_task_forbidden(mock_get_user, app_context, get_user, login_as) -> None: + """Test update fails when user doesn't own task (via base filter)""" + admin = get_user("admin") + mock_get_user.return_value = admin.username Review Comment: **Suggestion:** In `test_update_task_forbidden` the mock for `get_current_user` is left returning the admin username while the test then logs in as `"gamma"`; this mismatch means the mocked current-user value does not match the authenticated user and the test won't exercise the intended access-control path. Update the mock to return the gamma username (or set the mock to the correct value immediately before invoking the update) so the test simulates the non-owner user correctly. [logic error] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ test_update_task_forbidden fails in CI. - ⚠️ Access-control regression may be undetected. ``` </details> ```suggestion mock_get_user.return_value = get_user("gamma").username ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Open tests/integration_tests/tasks/commands/test_update.py and locate test_update_task_forbidden: the decorator is at line 83 and the mock set at line 87 (`mock_get_user.return_value = admin.username`). 2. The test then logs in as gamma at line 102: `login_as("gamma")`. 3. Run the test: `pytest tests/integration_tests/tasks/commands/test_update.py::test_update_task_forbidden -q`. 4. Because the mock is configured to return the admin username (line 87) but the test's authentication uses `login_as("gamma")` (line 102), the mocked `get_current_user` value does not match the logged-in test user. If the command under test (`UpdateTaskCommand`, constructed at lines 105-108 and executed inside the pytest.raises block at 111-112) relies on `get_current_user` for authorization, the test will not exercise the intended non-owner access path — the command may see the admin identity and the asserted TaskNotFoundError will not be triggered, causing the test to fail or be a false negative. The concrete lines referenced above show where the mismatch occurs. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** tests/integration_tests/tasks/commands/test_update.py **Line:** 87:87 **Comment:** *Logic Error: In `test_update_task_forbidden` the mock for `get_current_user` is left returning the admin username while the test then logs in as `"gamma"`; this mismatch means the mocked current-user value does not match the authenticated user and the test won't exercise the intended access-control path. Update the mock to return the gamma username (or set the mock to the correct value immediately before invoking the update) so the test simulates the non-owner user correctly. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
