villebro commented on code in PR #36368: URL: https://github.com/apache/superset/pull/36368#discussion_r2749742790
########## superset/models/tasks.py: ########## @@ -0,0 +1,370 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Task model for Global Task Framework (GTF)""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Any, cast + +from flask_appbuilder import Model +from sqlalchemy import ( + Column, + DateTime, + Integer, + String, + Text, +) +from sqlalchemy.orm import relationship +from superset_core.api.models import Task as CoreTask +from superset_core.api.tasks import TaskProperties, TaskStatus + +from superset.models.helpers import AuditMixinNullable +from superset.models.task_subscribers import TaskSubscriber +from superset.tasks.utils import ( + error_update, + get_finished_dedup_key, + parse_properties, + serialize_properties, +) +from superset.utils import json + + +class Task(CoreTask, AuditMixinNullable, Model): + """ + Concrete Task model for the Global Task Framework (GTF). + + This model represents async tasks in Superset, providing unified tracking + for all background operations including SQL queries, thumbnail generation, + reports, and other async operations. + + Non-filterable fields (progress, error info, execution config) are stored + in a `properties` JSON blob for schema flexibility. + """ + + __tablename__ = "tasks" + + # Primary key and identifiers + id = Column(Integer, primary_key=True) + uuid = Column( + String(36), nullable=False, unique=True, default=lambda: str(uuid.uuid4()) + ) + + # Task metadata (filterable) + task_key = Column(String(256), nullable=False, index=True) # For deduplication + task_type = Column(String(100), nullable=False, index=True) # e.g., 'sql_execution' + task_name = Column(String(256), nullable=True) # Human readable name + scope = Column( + String(20), nullable=False, index=True, default="private" + ) # private/shared/system + status = Column( + String(50), nullable=False, index=True, default=TaskStatus.PENDING.value + ) + dedup_key = Column( + String(64), nullable=False, unique=True, index=True + ) # Hashed deduplication key (SHA-256 = 64 chars, UUID = 36 chars) + + # Timestamps + started_at = Column(DateTime, nullable=True) + ended_at = Column(DateTime, nullable=True) + + # User context for execution + user_id = Column(Integer, nullable=True) + + # Task-specific output data (set by task code via ctx.update_task(payload=...)) + payload = Column(Text, nullable=True, default="{}") + + # Properties JSON blob - contains runtime state and execution config: + # - is_abortable: bool - has abort handler registered + # - progress_percent: float - progress 0.0-1.0 + # - progress_current: int - current iteration count + # - progress_total: int - total iterations + # - error_message: str - human-readable error message + # - exception_type: str - exception class name + # - stack_trace: str - full formatted traceback + # - timeout: int - timeout in seconds + _properties = Column("properties", Text, nullable=True, default="{}") + + # Transient cache for parsed properties (not persisted) + _properties_cache: TaskProperties | None = None + + # Relationships + subscribers = relationship( + TaskSubscriber, + back_populates="task", + cascade="all, delete-orphan", + ) + + def __repr__(self) -> str: + return f"<Task {self.task_type}:{self.task_key} [{self.status}]>" + + # ------------------------------------------------------------------------- + # Properties accessor + # ------------------------------------------------------------------------- + + @property + def properties(self) -> TaskProperties: + """ + Get typed properties (cached for performance). + + Properties contain runtime state and execution config that doesn't + need database filtering. Parsed once and cached until next write. + + Always use .get() for reads since keys may be absent. + + :returns: TaskProperties dict (sparse - only contains keys that were set) + """ + if self._properties_cache is None: + self._properties_cache = parse_properties(self._properties) + return self._properties_cache Review Comment: This is precisely the point of the `update_properties` method and keeping the cached properties dict private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
