rusackas commented on code in PR #40663:
URL: https://github.com/apache/superset/pull/40663#discussion_r3343671516


##########
superset/key_value/commands/prune.py:
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import datetime
+
+import sqlalchemy as sa
+
+from superset import db
+from superset.commands.base import BaseCommand
+from superset.key_value.models import KeyValueEntry
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class KeyValuePruneCommand(BaseCommand):
+    """
+    Command to prune the key-value store by deleting entries whose expiry has
+    already passed.
+
+    The metastore-backed key-value store keeps an `expires_on` timestamp for
+    entries written with a timeout (for example, the metastore cache backend).
+    Unlike cache backends that evict on read, the metastore does not remove 
rows
+    on its own, so expired entries remain in the table until something deletes
+    them. This command performs that housekeeping by deleting every entry whose
+    `expires_on` is in the past, freeing up space in the table.
+
+    Attributes:
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a
+                                       single run. If provided and greater than
+                                       zero, rows are selected 
deterministically
+                                       from the oldest first by id up to this
+                                       limit in this execution.
+    """
+
+    def __init__(self, max_rows_per_run: int | None = None):
+        """
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from
+            the oldest first by id up to this limit in this execution.
+        """
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs whose expiry has already passed. Entries without an
+        # expiry (expires_on IS NULL) never expire and are left untouched.
+        select_stmt = sa.select(KeyValueEntry.id).where(
+            KeyValueEntry.expires_on.isnot(None),
+            KeyValueEntry.expires_on < datetime.now(),
+        )
+
+        # Optionally limited by max_rows_per_run
+        # order by oldest first for deterministic deletion
+        if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
+            select_stmt = select_stmt.order_by(KeyValueEntry.id.asc()).limit(
+                self.max_rows_per_run
+            )
+
+        ids_to_delete = db.session.execute(select_stmt).scalars().all()
+
+        total_rows = len(ids_to_delete)
+
+        logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
+
+        next_logging_threshold = 1
+
+        # Iterate over the IDs in batches
+        for i in range(0, total_rows, batch_size):
+            batch_ids = ids_to_delete[i : i + batch_size]
+
+            # Delete the selected batch using IN clause
+            result = db.session.execute(
+                sa.delete(KeyValueEntry).where(KeyValueEntry.id.in_(batch_ids))
+            )

Review Comment:
   Went ahead and added this in the PR rather than deferring it. The command 
now captures a single `cutoff = datetime.now()` before selection and re-applies 
the expiry predicate (`expires_on IS NOT NULL AND expires_on <= cutoff`) on the 
batched DELETE, so an entry refreshed into the future between selection and 
deletion is no longer pruned. Added a regression test 
(`test_prune_skips_entry_refreshed_after_selection`) covering the TOCTOU window.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to