villebro commented on code in PR #36368: URL: https://github.com/apache/superset/pull/36368#discussion_r2748386846
########## superset/daos/tasks.py: ########## @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Task DAO for Global Task Framework (GTF)""" + +import logging +from datetime import datetime, timezone +from typing import Any + +from superset_core.api.tasks import TaskProperties, TaskScope, TaskStatus + +from superset.daos.base import BaseDAO +from superset.daos.exceptions import DAODeleteFailedError +from superset.extensions import db +from superset.models.task_subscribers import TaskSubscriber +from superset.models.tasks import Task +from superset.tasks.filters import TaskFilter +from superset.tasks.utils import get_active_dedup_key +from superset.utils.decorators import transaction + +logger = logging.getLogger(__name__) + + +class TaskDAO(BaseDAO[Task]): + """ + Concrete TaskDAO for the Global Task Framework (GTF). + + Provides database access operations for async tasks including + creation, status management, filtering, and subscription management + for shared tasks. + """ + + base_filter = TaskFilter + + @classmethod + def find_by_task_key( + cls, + task_type: str, + task_key: str, + scope: TaskScope | str = TaskScope.PRIVATE, + user_id: int | None = None, + ) -> Task | None: + """ + Find active task by type, key, scope, and user. + + Uses dedup_key internally for efficient querying with a unique index. + Only returns tasks that are active (pending or in progress). + + Uniqueness logic by scope: + - private: scope + task_type + task_key + user_id + - shared/system: scope + task_type + task_key (user-agnostic) + + :param task_type: Task type to filter by + :param task_key: Task identifier for deduplication + :param scope: Task scope (private/shared/system) + :param user_id: User ID (required for private tasks) + :returns: Task instance or None if not found or not active + """ + dedup_key = get_active_dedup_key( + scope=scope, + task_type=task_type, + task_key=task_key, + user_id=user_id, + ) + + # Simple single-column query with unique index + return db.session.query(Task).filter(Task.dedup_key == dedup_key).one_or_none() Review Comment: Check a few lines above - dedup_key is generated using `get_active_dedup_key` function which implicitly means only pending and in progress tasks are in scope -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
