villebro commented on code in PR #36368:
URL: https://github.com/apache/superset/pull/36368#discussion_r2748389049


##########
superset/commands/tasks/prune.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
+
+import sqlalchemy as sa
+from superset_core.api.tasks import TaskStatus
+
+from superset import db
+from superset.commands.base import BaseCommand
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class TaskPruneCommand(BaseCommand):
+    """
+    Command to prune the tasks table by deleting rows older than the specified
+    retention period.
+
+    This command deletes records from the `Task` table that are in terminal 
states
+    (success, failure, aborted, or timed_out) and have not been changed within 
the
+    specified number of days. It helps in maintaining the database by removing
+    outdated entries and freeing up space.
+
+    Attributes:
+        retention_period_days (int): The number of days for which records 
should be retained.
+                                     Records older than this period will be 
deleted.
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a single run.
+                                       If provided and greater than zero, rows 
are selected
+                                       deterministically from the oldest first 
(by timestamp then id)
+                                       up to this limit in this execution.
+    """  # noqa: E501
+
+    def __init__(self, retention_period_days: int, max_rows_per_run: int | 
None = None):
+        """
+        :param retention_period_days: Number of days to keep in the tasks table
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from the
+            oldest first (by timestamp then id) up to this limit in this 
execution.
+        """  # noqa: E501
+        self.retention_period_days = retention_period_days
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs that need to be deleted
+        # Only delete completed tasks (success, failure, or aborted)
+        from superset.models.tasks import Task
+
+        select_stmt = sa.select(Task.id).where(
+            Task.ended_at < datetime.now() - 
timedelta(days=self.retention_period_days),
+            Task.status.in_(
+                [
+                    TaskStatus.SUCCESS.value,
+                    TaskStatus.FAILURE.value,
+                    TaskStatus.ABORTED.value,
+                    TaskStatus.TIMED_OUT.value,
+                ]
+            ),
+        )
+
+        # Optionally limited by max_rows_per_run
+        # order by oldest first for deterministic deletion
+        if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
+            select_stmt = select_stmt.order_by(
+                Task.ended_at.asc(), Task.id.asc()
+            ).limit(self.max_rows_per_run)
+
+        ids_to_delete = db.session.execute(select_stmt).scalars().all()
+
+        total_rows = len(ids_to_delete)
+
+        logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
+
+        next_logging_threshold = 1
+
+        # Iterate over the IDs in batches
+        for i in range(0, total_rows, batch_size):
+            batch_ids = ids_to_delete[i : i + batch_size]
+
+            # Delete the selected batch using IN clause
+            result = 
db.session.execute(sa.delete(Task).where(Task.id.in_(batch_ids)))
+
+            # Update the total number of deleted records
+            total_deleted += result.rowcount
+
+            # Explicitly commit the transaction given that if an error occurs, 
we want to ensure that the  # noqa: E501
+            # records that have been deleted so far are committed
+            db.session.commit()
+
+            # Log the number of deleted records every 1% increase in progress
+            percentage_complete = (total_deleted / total_rows) * 100
+            if percentage_complete >= next_logging_threshold:
+                logger.info(
+                    "Deleted %s rows from the tasks table older than %s days 
(%d%% complete)",  # noqa: E501
+                    f"{total_deleted:,}",
+                    self.retention_period_days,
+                    percentage_complete,
+                )
+                next_logging_threshold += 1
+
+        elapsed_time = time.time() - start_time
+        minutes, seconds = divmod(elapsed_time, 60)
+        formatted_time = f"{int(minutes):02}:{int(seconds):02}"
+        logger.info(
+            "Pruning complete: %s rows deleted in %s",
+            f"{total_deleted:,}",
+            formatted_time,
+        )
+
+    def validate(self) -> None:
+        pass

Review Comment:
   irrelevant nitpick



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to